gemma.cpp/io/blob_store.cc

426 lines
16 KiB
C++

// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "io/blob_store.h"
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility> // std::move
#include <vector>
#include "io/io.h"
#include "util/threading_context.h"
#include "hwy/aligned_allocator.h" // Span
#include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/detect_compiler_arch.h"
namespace gcpp {
static_assert(HWY_IS_LITTLE_ENDIAN, "Assumes little endian");
// Each blob offset is a multiple of this, an upper bound on SVE vectors and
// usually also larger than L2 cache lines. This is useful when memory mapping
// the entire file, because offset alignment then determines the alignment of
// the blob in memory. Aligning each blob to the (largest) page size would be
// too wasteful, see `kEndAlign`.
constexpr size_t kBlobAlign = 256; // test also hard-codes this value
// Linux mmap requires the file to be a multiple of the (base) page size, which
// can be up to 64 KiB on Arm. Apple uses 16 KiB, most others use 4 KiB.
constexpr size_t kEndAlign = 64 * 1024;
constexpr size_t kU128Bytes = sizeof(hwy::uint128_t);
// Conversion between strings (<= `kU128Bytes` chars) and the fixed-size u128
// used to store them on disk.
static hwy::uint128_t KeyFromString(const char* string) {
size_t length = 0;
for (size_t i = 0; string[i] != '\0'; ++i) {
++length;
}
if (length > kU128Bytes) {
HWY_ABORT("Key %s is too long, please truncate to 16 chars.", string);
}
HWY_ASSERT(length != 0);
hwy::uint128_t ret;
hwy::ZeroBytes<sizeof(ret)>(&ret);
hwy::CopyBytes(string, &ret, length);
return ret;
}
static std::string StringFromKey(hwy::uint128_t key) {
std::string name(sizeof(key) + 1, '\0');
hwy::CopyBytes(&key, name.data(), sizeof(key));
name.resize(name.find('\0'));
return name;
}
namespace {
#pragma pack(push, 1)
struct Header { // standard layout class
uint32_t magic = 0; // kMagic
uint32_t num_blobs = 0; // never zero
uint64_t file_bytes = 0; // must match actual size of file
};
#pragma pack(pop)
static_assert(sizeof(Header) == 16);
} // namespace
// A write I/O request, each serviced by one thread in a pool.
struct BlobIO {
BlobIO(BlobRange range, void* data) : range(range), data(data) {}
BlobRange range;
void* data; // Read-only for writes.
};
// Little-endian on-disk representation: a fixed-size `Header`, then a padded
// variable-length 'directory' of blob keys and their offset/sizes, then the
// 'payload' of each blob's data with padding in between, followed by padding to
// `kEndAlign`. Keys are unique, opaque 128-bit keys.
//
// The file format deliberately omits a version number because it is unchanging.
// Additional data may be added only inside new blobs. Changes to the blob
// contents or type should be handled by renaming keys.
//
// This class is for internal use by `BlobReader` and `BlobWriter`. Its
// interface is more low-level: fixed-size keys instead of strings.
class BlobStore {
static constexpr uint32_t kMagic = 0x0A534253; // SBS\n
// Arbitrary upper limit to avoid allocating a huge vector.
static constexpr size_t kMaxBlobs = 64 * 1024;
// Returns the end of the directory, including padding, which is also the
// start of the first payload. `num_blobs` is `NumBlobs()` if the header is
// already available, otherwise the number of blobs to be written.
static size_t PaddedDirEnd(size_t num_blobs) {
HWY_ASSERT(num_blobs < kMaxBlobs);
// Per blob, a key and offset/size.
return RoundUpToAlign(sizeof(Header) + 2 * kU128Bytes * num_blobs);
}
static uint64_t PaddedPayloadBytes(size_t num_blobs,
const hwy::Span<const uint8_t> blobs[]) {
uint64_t total_payload_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
total_payload_bytes += RoundUpToAlign(blobs[i].size());
}
// Do not round up to `kEndAlign` because the padding also depends on the
// directory size. Here we only count the payload.
return total_payload_bytes;
}
static void EnsureUnique(hwy::Span<const hwy::uint128_t> keys) {
std::unordered_set<std::string> key_set;
for (const hwy::uint128_t key : keys) {
HWY_ASSERT(key_set.insert(StringFromKey(key)).second); // ensure inserted
}
}
public:
template <typename T>
static T RoundUpToAlign(T size_or_offset) {
return hwy::RoundUpTo(size_or_offset, kBlobAlign);
}
// Reads header/directory from file.
explicit BlobStore(const File& file) {
if (!file.Read(0, sizeof(header_), &header_)) {
HWY_WARN("Failed to read BlobStore header.");
return;
}
// Avoid allocating a huge vector.
if (header_.num_blobs >= kMaxBlobs) {
HWY_WARN("Too many blobs, likely corrupt file.");
return;
}
const size_t padded_dir_end = PaddedDirEnd(NumBlobs());
const size_t padded_dir_bytes = padded_dir_end - sizeof(header_);
HWY_ASSERT(padded_dir_bytes % kU128Bytes == 0);
directory_.resize(padded_dir_bytes / kU128Bytes);
if (!file.Read(sizeof(header_), padded_dir_bytes, directory_.data())) {
HWY_WARN("Failed to read BlobStore directory.");
return;
}
}
// Initializes header/directory for writing to disk.
BlobStore(size_t num_blobs, const hwy::uint128_t keys[],
const hwy::Span<const uint8_t> blobs[]) {
HWY_ASSERT(num_blobs < kMaxBlobs); // Ensures safe to cast to u32.
HWY_ASSERT(keys && blobs);
EnsureUnique(hwy::Span<const hwy::uint128_t>(keys, num_blobs));
uint64_t offset = PaddedDirEnd(num_blobs);
const size_t padded_dir_bytes =
static_cast<size_t>(offset) - sizeof(header_);
header_.magic = kMagic;
header_.num_blobs = static_cast<uint32_t>(num_blobs);
header_.file_bytes = hwy::RoundUpTo(
offset + PaddedPayloadBytes(num_blobs, blobs), kEndAlign);
HWY_ASSERT(padded_dir_bytes % kU128Bytes == 0);
directory_.resize(padded_dir_bytes / kU128Bytes);
hwy::CopyBytes(keys, directory_.data(), num_blobs * kU128Bytes);
EnsureUnique(Keys());
// `SetRange` below will fill `directory_[num_blobs, 2 * num_blobs)`.
hwy::ZeroBytes(directory_.data() + 2 * num_blobs,
padded_dir_bytes - 2 * num_blobs * kU128Bytes);
// We already zero-initialized the directory padding;
// `BlobWriter::WriteAll` takes care of padding after each blob via an
// additional I/O.
for (size_t i = 0; i < num_blobs; ++i) {
HWY_ASSERT(blobs[i].data() != nullptr);
SetRange(i, offset, blobs[i].size());
offset = RoundUpToAlign(offset + blobs[i].size());
}
// When writing new files, we always pad to `kEndAlign`.
HWY_ASSERT(hwy::RoundUpTo(offset, kEndAlign) == header_.file_bytes);
}
// Must be checked by readers before other methods.
bool IsValid(const uint64_t file_size) const {
// Ctor failed and already printed a warning.
if (directory_.empty()) return false;
if (header_.magic != kMagic) {
HWY_WARN("Given file is not a BlobStore (magic %08x).", header_.magic);
return false;
}
if (header_.num_blobs == 0) {
HWY_WARN("Invalid BlobStore (empty), likely corrupt file.");
return false;
}
if (header_.file_bytes != file_size) {
HWY_WARN("File length %zu does not match header %zu (truncated?).",
static_cast<size_t>(file_size),
static_cast<size_t>(header_.file_bytes));
return false;
}
// Ensure blobs are back to back.
uint64_t expected_offset = PaddedDirEnd(NumBlobs());
for (size_t key_idx = 0; key_idx < NumBlobs(); ++key_idx) {
uint64_t actual_offset;
size_t bytes;
GetRange(key_idx, actual_offset, bytes);
if (expected_offset != actual_offset) {
HWY_WARN("Invalid BlobStore: blob %zu at offset %zu but expected %zu.",
key_idx, static_cast<size_t>(actual_offset),
static_cast<size_t>(expected_offset));
return false;
}
expected_offset = RoundUpToAlign(expected_offset + bytes);
}
// Previously files were not padded to `kEndAlign`, so also allow that.
if (expected_offset != header_.file_bytes &&
hwy::RoundUpTo(expected_offset, kEndAlign) != header_.file_bytes) {
HWY_WARN("Invalid BlobStore: end of blobs %zu but file size %zu.",
static_cast<size_t>(expected_offset),
static_cast<size_t>(header_.file_bytes));
return false;
}
return true; // all OK
}
void EnqueueWriteForHeaderAndDirectory(std::vector<BlobIO>& writes) const {
const size_t key_idx = 0; // not actually associated with a key/blob
writes.emplace_back(
BlobRange{.offset = 0, .bytes = sizeof(header_), .key_idx = key_idx},
// members are const and BlobIO requires non-const pointers, and they
// are not modified by file writes.
const_cast<Header*>(&header_));
writes.emplace_back(
BlobRange{.offset = sizeof(header_),
.bytes = PaddedDirEnd(NumBlobs()) - sizeof(header_),
.key_idx = key_idx},
const_cast<hwy::uint128_t*>(directory_.data()));
}
size_t NumBlobs() const { return static_cast<size_t>(header_.num_blobs); }
// Not the entirety of `directory_`! The second half is offset/size.
hwy::Span<const hwy::uint128_t> Keys() const {
return hwy::Span<const hwy::uint128_t>(directory_.data(), NumBlobs());
}
// Retrieves blob's offset and size, not including padding.
void GetRange(size_t key_idx, uint64_t& offset, size_t& bytes) const {
HWY_ASSERT(key_idx < NumBlobs());
const hwy::uint128_t val = directory_[NumBlobs() + key_idx];
offset = val.lo;
bytes = val.hi;
HWY_ASSERT(offset % kBlobAlign == 0);
HWY_ASSERT(bytes != 0);
HWY_ASSERT(offset + bytes <= header_.file_bytes);
}
private:
// Stores offset and range into u128 following the keys, so the directory
// can be one array of the same type, and read/written together with keys.
void SetRange(size_t key_idx, uint64_t offset, size_t bytes) {
HWY_ASSERT(key_idx < NumBlobs());
HWY_ASSERT(offset % kBlobAlign == 0);
HWY_ASSERT(bytes != 0);
HWY_ASSERT(offset + bytes <= header_.file_bytes);
hwy::uint128_t& val = directory_[NumBlobs() + key_idx];
val.lo = offset;
val.hi = bytes;
}
Header header_;
std::vector<hwy::uint128_t> directory_; // two per blob, see `SetRange`.
}; // BlobStore
BlobReader::BlobReader(const Path& blob_path)
: file_(OpenFileOrAbort(blob_path, "r")), file_bytes_(file_->FileSize()) {
if (file_bytes_ == 0) HWY_ABORT("Zero-sized file %s", blob_path.path.c_str());
BlobStore bs(*file_);
HWY_ASSERT(bs.IsValid(file_bytes_)); // IsValid already printed a warning
keys_.reserve(bs.NumBlobs());
for (const hwy::uint128_t key : bs.Keys()) {
keys_.push_back(StringFromKey(key));
}
ranges_.reserve(bs.NumBlobs());
// Populate hash map for O(1) lookups.
for (size_t key_idx = 0; key_idx < keys_.size(); ++key_idx) {
uint64_t offset;
size_t bytes;
bs.GetRange(key_idx, offset, bytes);
ranges_.emplace_back(
BlobRange{.offset = offset, .bytes = bytes, .key_idx = key_idx});
key_idx_for_key_[keys_[key_idx]] = key_idx;
}
}
// Split into chunks for load-balancing even if blob sizes vary.
static void EnqueueChunks(size_t key_idx, uint64_t offset, uint64_t bytes,
uint8_t* data, std::vector<BlobIO>& writes) {
constexpr size_t kChunkBytes = 4 * 1024 * 1024;
const uint64_t end = offset + bytes;
// Split into whole chunks and possibly one remainder.
if (end >= kChunkBytes) {
for (; offset <= end - kChunkBytes;
offset += kChunkBytes, data += kChunkBytes) {
writes.emplace_back(
BlobRange{.offset = offset, .bytes = kChunkBytes, .key_idx = key_idx},
data);
}
}
if (offset != end) {
writes.emplace_back(
BlobRange{.offset = offset, .bytes = end - offset, .key_idx = key_idx},
data);
}
}
static void EnqueueWritesForBlobs(const BlobStore& bs,
const hwy::Span<const uint8_t> blobs[],
std::vector<uint8_t>& zeros,
std::vector<BlobIO>& writes) {
// All-zero buffer used to write padding to the file without copying the
// input blobs.
static constexpr uint8_t kZeros[kBlobAlign] = {0};
uint64_t file_end = 0; // for padding
for (size_t key_idx = 0; key_idx < bs.NumBlobs(); ++key_idx) {
// We know the size, but `BlobStore` tells us the offset to write each blob.
uint64_t offset;
size_t bytes;
bs.GetRange(key_idx, offset, bytes);
HWY_ASSERT(offset != 0);
HWY_ASSERT(bytes == blobs[key_idx].size());
const uint64_t new_file_end = offset + bytes;
HWY_ASSERT(new_file_end >= file_end); // blobs are ordered by offset
file_end = new_file_end;
EnqueueChunks(key_idx, offset, bytes,
const_cast<uint8_t*>(blobs[key_idx].data()), writes);
const size_t padding = BlobStore::RoundUpToAlign(bytes) - bytes;
if (padding != 0) {
HWY_ASSERT(padding <= kBlobAlign);
writes.emplace_back(
BlobRange{
.offset = offset + bytes, .bytes = padding, .key_idx = key_idx},
const_cast<uint8_t*>(kZeros));
}
}
const size_t padding = hwy::RoundUpTo(file_end, kEndAlign) - file_end;
if (padding != 0) {
// Bigger than `kZeros`, better to allocate than issue multiple I/Os. Must
// remain alive until the last I/O is done.
zeros.resize(padding);
writes.emplace_back(
BlobRange{.offset = file_end, .bytes = padding, .key_idx = 0},
zeros.data());
}
}
void BlobWriter::Add(const std::string& key, const void* data, size_t bytes) {
HWY_ASSERT(data != nullptr);
HWY_ASSERT(bytes != 0);
keys_.push_back(KeyFromString(key.c_str()));
blobs_.emplace_back(static_cast<const uint8_t*>(data), bytes);
}
void BlobWriter::WriteAll(hwy::ThreadPool& pool, const Path& filename) {
const size_t num_blobs = keys_.size();
HWY_ASSERT(num_blobs != 0);
HWY_ASSERT(num_blobs == blobs_.size());
std::vector<BlobIO> writes;
writes.reserve(16384);
const BlobStore bs(num_blobs, keys_.data(), blobs_.data());
bs.EnqueueWriteForHeaderAndDirectory(writes);
std::vector<uint8_t> zeros;
EnqueueWritesForBlobs(bs, blobs_.data(), zeros, writes);
// Create/replace existing file.
std::unique_ptr<File> file = OpenFileOrNull(filename, "w+");
if (!file) HWY_ABORT("Failed to open for writing %s", filename.path.c_str());
pool.Run(0, writes.size(),
[this, &file, &writes](uint64_t i, size_t /*thread*/) {
const BlobRange& range = writes[i].range;
if (!file->Write(writes[i].data, range.bytes, range.offset)) {
const std::string& key = StringFromKey(keys_[range.key_idx]);
HWY_ABORT("Write failed for %s from %zu, %zu bytes to %p.",
key.c_str(), static_cast<size_t>(range.offset),
range.bytes, writes[i].data);
}
});
}
} // namespace gcpp