diff --git a/compression/blob_store.cc b/compression/blob_store.cc index 47bf00f..13c9563 100644 --- a/compression/blob_store.cc +++ b/compression/blob_store.cc @@ -49,7 +49,7 @@ namespace { void EnqueueChunkRequests(uint64_t offset, uint64_t size, uint8_t* data, std::vector& requests) { // Split into chunks for load-balancing even if blob sizes vary. - constexpr size_t kChunkSize = 4 * 1024 * 1024; + constexpr size_t kChunkSize = 4 * 1024 * 1024; // bytes // Split into whole chunks and possibly one remainder. uint64_t pos = 0; diff --git a/compression/blob_store.h b/compression/blob_store.h index 119f95c..c881564 100644 --- a/compression/blob_store.h +++ b/compression/blob_store.h @@ -48,12 +48,13 @@ using BlobError = int; // 128), which can help performance. static constexpr size_t kBlobAlign = 256; +// One I/O request, serviced by threads in a pool. struct BlobIO { BlobIO(uint64_t offset, size_t size, void* data, uint64_t padding) : offset(offset), size(size), data(data), padding(padding) {} uint64_t offset; - size_t size; + size_t size; // bytes void* data; uint64_t padding; }; @@ -66,7 +67,8 @@ class BlobReader { // Opens `filename` and reads its header. BlobError Open(const Path& filename); - // Enqueues read requests if `key` is found and its size matches `size`. + // Enqueues read requests if `key` is found and its size matches `size`, which + // is in units of bytes. BlobError Enqueue(hwy::uint128_t key, void* data, size_t size); // Reads all enqueued requests. @@ -80,6 +82,7 @@ class BlobReader { class BlobWriter { public: + // `size` is in bytes. void Add(hwy::uint128_t key, const void* data, size_t size) { keys_.push_back(key); blobs_.emplace_back(static_cast(data), size); diff --git a/compression/compress-inl.h b/compression/compress-inl.h index e4ea1a1..1e74292 100644 --- a/compression/compress-inl.h +++ b/compression/compress-inl.h @@ -624,21 +624,22 @@ class Compressor { explicit Compressor(hwy::ThreadPool& pool) : pool_(pool) {} template - void operator()(const char* name, const float* weights, + void operator()(const char* name, const float* HWY_RESTRICT weights, CompressedArray& compressed) { - Insert(name, weights, kCapacity, work_, compressed.CompressedSize(), - compressed.data(), 0, pool_); + Insert(name, weights, kCapacity, work_, compressed.GetSpan(), + /*packed_ofs=*/0, pool_); } template - void Insert(const char* name, const float* weights, size_t weights_count, - CompressWorkingSet& work, size_t out_capacity, Packed* packed, - size_t packed_ofs, hwy::ThreadPool& pool) { - fprintf(stderr, "Regenerating %s (%zuM), please wait\n", name, - weights_count / (1000 * 1000)); - Compress(weights, weights_count, work_, - PackedSpan{packed, weights_count}, 0, pool_); - writer_.Add(CacheKey(name), packed, out_capacity); + void Insert(const char* name, const float* HWY_RESTRICT weights, + size_t num_weights, CompressWorkingSet& work, + const PackedSpan& packed, size_t packed_ofs, + hwy::ThreadPool& pool) { + fprintf(stderr, "Compressing %s (%zuM), please wait\n", name, + num_weights / (1000 * 1000)); + Compress(weights, num_weights, work_, packed, packed_ofs, pool_); + const size_t num_bytes = packed.num * sizeof(Packed); + writer_.Add(CacheKey(name), packed.ptr, num_bytes); } void AddScales(const float* scales, size_t len) { diff --git a/compression/compress.h b/compression/compress.h index cfd512f..275306f 100644 --- a/compression/compress.h +++ b/compression/compress.h @@ -43,20 +43,20 @@ namespace gcpp { // Compressed representation of floating-point elements. The array length may // differ from the number of elements. Associated operations such as Dot are // implemented in SIMD code and are thus non-member functions. -template +template class CompressedArray { public: - using value_type = MatT; + using value_type = Packed; // Note that whenever you access data(), you have to consider a scale() that // may be different from 1.0f. - MatT* data() { return data_.data(); } - const MatT* data() const { return data_.data(); } + Packed* data() { return data_.data(); } + const Packed* data() const { return data_.data(); } // The const accessor data_scale1() asserts (!) that the scale is 1.0f, so // calling it means "I am sure the scale is 1 and therefore ignore the scale". // A scale of 0 indicates that the scale has likely never been set, so is // "implicitly 1". - const MatT* data_scale1() const { + const Packed* data_scale1() const { HWY_ASSERT(scale() == 1.f || scale() == 0.f); return data_.data(); } @@ -67,14 +67,17 @@ class CompressedArray { float scale() const { return scale_[0]; } void set_scale(float scale) { scale_[0] = scale; } - constexpr size_t size() const { return kCapacity; } + constexpr size_t NumElements() const { return kCapacity; } - constexpr size_t CompressedSize() const { - return data_.size() * sizeof(MatT); + // Returns total number of packed elements for `BlobReader::Enqueue` and + // `Compress`. This differs from `NumElements` for `Packed=NuqStream`. + PackedSpan GetSpan() { return MakeSpan(data(), data_.size()); } + PackedSpan GetSpan() const { + return MakeSpan(data(), data_.size()); } private: - std::array(kCapacity)> data_; + std::array(kCapacity)> data_; // Blobs are at least kBlobAlign bytes anyway. float scale_[kBlobAlign / sizeof(float)]; }; @@ -146,14 +149,14 @@ struct CompressWorkingSet { // Returns key for the given tensor name. Also encodes the type, so that // changing the representation automatically invalidates prior cached files // (the new blob name will not be found). -template +template hwy::uint128_t CacheKey(const char* name) { // Already used/retired: s, S, n, 1 - const char prefix = hwy::IsSame() ? 'F' - : hwy::IsSame() ? 'B' - : hwy::IsSame() ? '$' - : hwy::IsSame() ? '2' - : '?'; + const char prefix = hwy::IsSame() ? 'F' + : hwy::IsSame() ? 'B' + : hwy::IsSame() ? '$' + : hwy::IsSame() ? '2' + : '?'; return MakeKey((std::string(1, prefix) + name).c_str()); } @@ -173,17 +176,18 @@ class CacheLoader { } // Called for each tensor, enqueues read requests. - template + template void operator()(const char* name, const float* null, - CompressedArray& compressed) { + CompressedArray& compressed) { HWY_DASSERT(null == nullptr); // Skip if reader_ is invalid or any load failed: we will regenerate // everything because it's rare to update only a few tensors. if (err_ != 0) return; - err_ = reader_.Enqueue(CacheKey(name), compressed.data(), - compressed.CompressedSize()); + const PackedSpan span = compressed.GetSpan(); + const size_t num_bytes = span.num * sizeof(Packed); + err_ = reader_.Enqueue(CacheKey(name), span.ptr, num_bytes); compressed.set_scale(1.0f); if (err_ != 0) { fprintf(stderr, "Failed to read cache %s (error %d)\n", name, err_); diff --git a/compression/python/compression_clif_aux.cc b/compression/python/compression_clif_aux.cc index 4b9e146..1be4ed2 100644 --- a/compression/python/compression_clif_aux.cc +++ b/compression/python/compression_clif_aux.cc @@ -4,6 +4,8 @@ #include #include "compression/compress.h" +#include "compression/shared.h" +#include "hwy/aligned_allocator.h" #undef HWY_TARGET_INCLUDE #define HWY_TARGET_INCLUDE \ @@ -18,6 +20,7 @@ #ifndef GEMMA_ONCE #define GEMMA_ONCE +#include "third_party/absl/types/span.h" #include "compression/io.h" #include "hwy/base.h" #include "hwy/contrib/thread_pool/thread_pool.h" @@ -47,33 +50,31 @@ namespace gcpp { namespace HWY_NAMESPACE { class SbsWriterImpl : public WriterInterface { + template + hwy::AlignedFreeUniquePtr AllocateAndCompress( + const std::string& name, absl::Span weights) { + const size_t num_packed = CompressedArrayElements(weights.size()); + auto packed = hwy::AllocateAligned(num_packed); + PackedSpan span = MakeSpan(packed.get(), num_packed); + compressor_.Insert(name.c_str(), weights.data(), weights.size(), + working_set_, span, /*packed_ofs=*/0, pool_); + return packed; + } + public: SbsWriterImpl() : pool_(0), compressor_(pool_) {} void Insert(std::string name, absl::Span weights) override { - const size_t out_size = CompressedArrayElements(weights.size()); - sfp_streams_.push_back(std::vector(out_size)); - compressor_.Insert(name.data(), weights.data(), weights.size(), - working_set_, out_size, - sfp_streams_.back().data(), 0, pool_); + sfp_streams_.push_back(AllocateAndCompress(name, weights)); } void InsertNUQ(std::string name, absl::Span weights) override { - const size_t out_size = CompressedArrayElements(weights.size()); - nuq_streams_.push_back(std::vector(out_size)); - compressor_.Insert(name.data(), weights.data(), weights.size(), - working_set_, out_size, - nuq_streams_.back().data(), 0, pool_); + nuq_streams_.push_back(AllocateAndCompress(name, weights)); } void InsertBfloat16(std::string name, absl::Span weights) override { - const size_t out_size = - CompressedArrayElements(weights.size()); - bf16_streams_.push_back(std::vector(out_size)); - compressor_.Insert(name.data(), weights.data(), - weights.size(), working_set_, out_size, - bf16_streams_.back().data(), 0, pool_); + bf16_streams_.push_back(AllocateAndCompress(name, weights)); } void AddScales(const std::vector& scales) override { @@ -89,9 +90,9 @@ class SbsWriterImpl : public WriterInterface { hwy::ThreadPool pool_; Compressor compressor_; CompressWorkingSet working_set_; - std::vector> sfp_streams_; - std::vector> nuq_streams_; - std::vector> bf16_streams_; + std::vector> sfp_streams_; + std::vector> nuq_streams_; + std::vector> bf16_streams_; std::vector scales_; };