Fix mismatch between blob_store and compress interfaces (bytes)

PiperOrigin-RevId: 673027268
This commit is contained in:
Jan Wassenberg 2024-09-10 10:58:27 -07:00 committed by Copybara-Service
parent 8c0a8834c1
commit 13a9f76f64
5 changed files with 61 additions and 52 deletions

View File

@ -49,7 +49,7 @@ namespace {
void EnqueueChunkRequests(uint64_t offset, uint64_t size, uint8_t* data, void EnqueueChunkRequests(uint64_t offset, uint64_t size, uint8_t* data,
std::vector<BlobIO>& requests) { std::vector<BlobIO>& requests) {
// Split into chunks for load-balancing even if blob sizes vary. // Split into chunks for load-balancing even if blob sizes vary.
constexpr size_t kChunkSize = 4 * 1024 * 1024; constexpr size_t kChunkSize = 4 * 1024 * 1024; // bytes
// Split into whole chunks and possibly one remainder. // Split into whole chunks and possibly one remainder.
uint64_t pos = 0; uint64_t pos = 0;

View File

@ -48,12 +48,13 @@ using BlobError = int;
// 128), which can help performance. // 128), which can help performance.
static constexpr size_t kBlobAlign = 256; static constexpr size_t kBlobAlign = 256;
// One I/O request, serviced by threads in a pool.
struct BlobIO { struct BlobIO {
BlobIO(uint64_t offset, size_t size, void* data, uint64_t padding) BlobIO(uint64_t offset, size_t size, void* data, uint64_t padding)
: offset(offset), size(size), data(data), padding(padding) {} : offset(offset), size(size), data(data), padding(padding) {}
uint64_t offset; uint64_t offset;
size_t size; size_t size; // bytes
void* data; void* data;
uint64_t padding; uint64_t padding;
}; };
@ -66,7 +67,8 @@ class BlobReader {
// Opens `filename` and reads its header. // Opens `filename` and reads its header.
BlobError Open(const Path& filename); BlobError Open(const Path& filename);
// Enqueues read requests if `key` is found and its size matches `size`. // Enqueues read requests if `key` is found and its size matches `size`, which
// is in units of bytes.
BlobError Enqueue(hwy::uint128_t key, void* data, size_t size); BlobError Enqueue(hwy::uint128_t key, void* data, size_t size);
// Reads all enqueued requests. // Reads all enqueued requests.
@ -80,6 +82,7 @@ class BlobReader {
class BlobWriter { class BlobWriter {
public: public:
// `size` is in bytes.
void Add(hwy::uint128_t key, const void* data, size_t size) { void Add(hwy::uint128_t key, const void* data, size_t size) {
keys_.push_back(key); keys_.push_back(key);
blobs_.emplace_back(static_cast<const uint8_t*>(data), size); blobs_.emplace_back(static_cast<const uint8_t*>(data), size);

View File

@ -624,21 +624,22 @@ class Compressor {
explicit Compressor(hwy::ThreadPool& pool) : pool_(pool) {} explicit Compressor(hwy::ThreadPool& pool) : pool_(pool) {}
template <typename Packed, size_t kCapacity> template <typename Packed, size_t kCapacity>
void operator()(const char* name, const float* weights, void operator()(const char* name, const float* HWY_RESTRICT weights,
CompressedArray<Packed, kCapacity>& compressed) { CompressedArray<Packed, kCapacity>& compressed) {
Insert(name, weights, kCapacity, work_, compressed.CompressedSize(), Insert(name, weights, kCapacity, work_, compressed.GetSpan(),
compressed.data(), 0, pool_); /*packed_ofs=*/0, pool_);
} }
template <typename Packed> template <typename Packed>
void Insert(const char* name, const float* weights, size_t weights_count, void Insert(const char* name, const float* HWY_RESTRICT weights,
CompressWorkingSet& work, size_t out_capacity, Packed* packed, size_t num_weights, CompressWorkingSet& work,
size_t packed_ofs, hwy::ThreadPool& pool) { const PackedSpan<Packed>& packed, size_t packed_ofs,
fprintf(stderr, "Regenerating %s (%zuM), please wait\n", name, hwy::ThreadPool& pool) {
weights_count / (1000 * 1000)); fprintf(stderr, "Compressing %s (%zuM), please wait\n", name,
Compress(weights, weights_count, work_, num_weights / (1000 * 1000));
PackedSpan<Packed>{packed, weights_count}, 0, pool_); Compress(weights, num_weights, work_, packed, packed_ofs, pool_);
writer_.Add(CacheKey<Packed>(name), packed, out_capacity); const size_t num_bytes = packed.num * sizeof(Packed);
writer_.Add(CacheKey<Packed>(name), packed.ptr, num_bytes);
} }
void AddScales(const float* scales, size_t len) { void AddScales(const float* scales, size_t len) {

View File

@ -43,20 +43,20 @@ namespace gcpp {
// Compressed representation of floating-point elements. The array length may // Compressed representation of floating-point elements. The array length may
// differ from the number of elements. Associated operations such as Dot are // differ from the number of elements. Associated operations such as Dot are
// implemented in SIMD code and are thus non-member functions. // implemented in SIMD code and are thus non-member functions.
template <typename MatT, size_t kCapacity> template <typename Packed, size_t kCapacity>
class CompressedArray { class CompressedArray {
public: public:
using value_type = MatT; using value_type = Packed;
// Note that whenever you access data(), you have to consider a scale() that // Note that whenever you access data(), you have to consider a scale() that
// may be different from 1.0f. // may be different from 1.0f.
MatT* data() { return data_.data(); } Packed* data() { return data_.data(); }
const MatT* data() const { return data_.data(); } const Packed* data() const { return data_.data(); }
// The const accessor data_scale1() asserts (!) that the scale is 1.0f, so // The const accessor data_scale1() asserts (!) that the scale is 1.0f, so
// calling it means "I am sure the scale is 1 and therefore ignore the scale". // calling it means "I am sure the scale is 1 and therefore ignore the scale".
// A scale of 0 indicates that the scale has likely never been set, so is // A scale of 0 indicates that the scale has likely never been set, so is
// "implicitly 1". // "implicitly 1".
const MatT* data_scale1() const { const Packed* data_scale1() const {
HWY_ASSERT(scale() == 1.f || scale() == 0.f); HWY_ASSERT(scale() == 1.f || scale() == 0.f);
return data_.data(); return data_.data();
} }
@ -67,14 +67,17 @@ class CompressedArray {
float scale() const { return scale_[0]; } float scale() const { return scale_[0]; }
void set_scale(float scale) { scale_[0] = scale; } void set_scale(float scale) { scale_[0] = scale; }
constexpr size_t size() const { return kCapacity; } constexpr size_t NumElements() const { return kCapacity; }
constexpr size_t CompressedSize() const { // Returns total number of packed elements for `BlobReader::Enqueue` and
return data_.size() * sizeof(MatT); // `Compress`. This differs from `NumElements` for `Packed=NuqStream`.
PackedSpan<Packed> GetSpan() { return MakeSpan(data(), data_.size()); }
PackedSpan<const Packed> GetSpan() const {
return MakeSpan(data(), data_.size());
} }
private: private:
std::array<MatT, CompressedArrayElements<MatT>(kCapacity)> data_; std::array<Packed, CompressedArrayElements<Packed>(kCapacity)> data_;
// Blobs are at least kBlobAlign bytes anyway. // Blobs are at least kBlobAlign bytes anyway.
float scale_[kBlobAlign / sizeof(float)]; float scale_[kBlobAlign / sizeof(float)];
}; };
@ -146,14 +149,14 @@ struct CompressWorkingSet {
// Returns key for the given tensor name. Also encodes the type, so that // Returns key for the given tensor name. Also encodes the type, so that
// changing the representation automatically invalidates prior cached files // changing the representation automatically invalidates prior cached files
// (the new blob name will not be found). // (the new blob name will not be found).
template <typename MatT> template <typename Packed>
hwy::uint128_t CacheKey(const char* name) { hwy::uint128_t CacheKey(const char* name) {
// Already used/retired: s, S, n, 1 // Already used/retired: s, S, n, 1
const char prefix = hwy::IsSame<MatT, float>() ? 'F' const char prefix = hwy::IsSame<Packed, float>() ? 'F'
: hwy::IsSame<MatT, BF16>() ? 'B' : hwy::IsSame<Packed, BF16>() ? 'B'
: hwy::IsSame<MatT, SfpStream>() ? '$' : hwy::IsSame<Packed, SfpStream>() ? '$'
: hwy::IsSame<MatT, NuqStream>() ? '2' : hwy::IsSame<Packed, NuqStream>() ? '2'
: '?'; : '?';
return MakeKey((std::string(1, prefix) + name).c_str()); return MakeKey((std::string(1, prefix) + name).c_str());
} }
@ -173,17 +176,18 @@ class CacheLoader {
} }
// Called for each tensor, enqueues read requests. // Called for each tensor, enqueues read requests.
template <typename MatT, size_t kCapacity> template <typename Packed, size_t kCapacity>
void operator()(const char* name, const float* null, void operator()(const char* name, const float* null,
CompressedArray<MatT, kCapacity>& compressed) { CompressedArray<Packed, kCapacity>& compressed) {
HWY_DASSERT(null == nullptr); HWY_DASSERT(null == nullptr);
// Skip if reader_ is invalid or any load failed: we will regenerate // Skip if reader_ is invalid or any load failed: we will regenerate
// everything because it's rare to update only a few tensors. // everything because it's rare to update only a few tensors.
if (err_ != 0) return; if (err_ != 0) return;
err_ = reader_.Enqueue(CacheKey<MatT>(name), compressed.data(), const PackedSpan<Packed> span = compressed.GetSpan();
compressed.CompressedSize()); const size_t num_bytes = span.num * sizeof(Packed);
err_ = reader_.Enqueue(CacheKey<Packed>(name), span.ptr, num_bytes);
compressed.set_scale(1.0f); compressed.set_scale(1.0f);
if (err_ != 0) { if (err_ != 0) {
fprintf(stderr, "Failed to read cache %s (error %d)\n", name, err_); fprintf(stderr, "Failed to read cache %s (error %d)\n", name, err_);

View File

@ -4,6 +4,8 @@
#include <vector> #include <vector>
#include "compression/compress.h" #include "compression/compress.h"
#include "compression/shared.h"
#include "hwy/aligned_allocator.h"
#undef HWY_TARGET_INCLUDE #undef HWY_TARGET_INCLUDE
#define HWY_TARGET_INCLUDE \ #define HWY_TARGET_INCLUDE \
@ -18,6 +20,7 @@
#ifndef GEMMA_ONCE #ifndef GEMMA_ONCE
#define GEMMA_ONCE #define GEMMA_ONCE
#include "third_party/absl/types/span.h"
#include "compression/io.h" #include "compression/io.h"
#include "hwy/base.h" #include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/contrib/thread_pool/thread_pool.h"
@ -47,33 +50,31 @@ namespace gcpp {
namespace HWY_NAMESPACE { namespace HWY_NAMESPACE {
class SbsWriterImpl : public WriterInterface { class SbsWriterImpl : public WriterInterface {
template <typename Packed>
hwy::AlignedFreeUniquePtr<Packed[]> AllocateAndCompress(
const std::string& name, absl::Span<const float> weights) {
const size_t num_packed = CompressedArrayElements<Packed>(weights.size());
auto packed = hwy::AllocateAligned<Packed>(num_packed);
PackedSpan<Packed> span = MakeSpan(packed.get(), num_packed);
compressor_.Insert(name.c_str(), weights.data(), weights.size(),
working_set_, span, /*packed_ofs=*/0, pool_);
return packed;
}
public: public:
SbsWriterImpl() : pool_(0), compressor_(pool_) {} SbsWriterImpl() : pool_(0), compressor_(pool_) {}
void Insert(std::string name, absl::Span<const float> weights) override { void Insert(std::string name, absl::Span<const float> weights) override {
const size_t out_size = CompressedArrayElements<SfpStream>(weights.size()); sfp_streams_.push_back(AllocateAndCompress<SfpStream>(name, weights));
sfp_streams_.push_back(std::vector<SfpStream>(out_size));
compressor_.Insert<SfpStream>(name.data(), weights.data(), weights.size(),
working_set_, out_size,
sfp_streams_.back().data(), 0, pool_);
} }
void InsertNUQ(std::string name, absl::Span<const float> weights) override { void InsertNUQ(std::string name, absl::Span<const float> weights) override {
const size_t out_size = CompressedArrayElements<NuqStream>(weights.size()); nuq_streams_.push_back(AllocateAndCompress<NuqStream>(name, weights));
nuq_streams_.push_back(std::vector<NuqStream>(out_size));
compressor_.Insert<NuqStream>(name.data(), weights.data(), weights.size(),
working_set_, out_size,
nuq_streams_.back().data(), 0, pool_);
} }
void InsertBfloat16(std::string name, void InsertBfloat16(std::string name,
absl::Span<const float> weights) override { absl::Span<const float> weights) override {
const size_t out_size = bf16_streams_.push_back(AllocateAndCompress<BF16>(name, weights));
CompressedArrayElements<hwy::bfloat16_t>(weights.size());
bf16_streams_.push_back(std::vector<hwy::bfloat16_t>(out_size));
compressor_.Insert<hwy::bfloat16_t>(name.data(), weights.data(),
weights.size(), working_set_, out_size,
bf16_streams_.back().data(), 0, pool_);
} }
void AddScales(const std::vector<float>& scales) override { void AddScales(const std::vector<float>& scales) override {
@ -89,9 +90,9 @@ class SbsWriterImpl : public WriterInterface {
hwy::ThreadPool pool_; hwy::ThreadPool pool_;
Compressor compressor_; Compressor compressor_;
CompressWorkingSet working_set_; CompressWorkingSet working_set_;
std::vector<std::vector<SfpStream>> sfp_streams_; std::vector<hwy::AlignedFreeUniquePtr<SfpStream[]>> sfp_streams_;
std::vector<std::vector<NuqStream>> nuq_streams_; std::vector<hwy::AlignedFreeUniquePtr<NuqStream[]>> nuq_streams_;
std::vector<std::vector<hwy::bfloat16_t>> bf16_streams_; std::vector<hwy::AlignedFreeUniquePtr<BF16[]>> bf16_streams_;
std::vector<float> scales_; std::vector<float> scales_;
}; };