diff --git a/util/allocator.cc b/util/allocator.cc index 4beedca..20d65ad 100644 --- a/util/allocator.cc +++ b/util/allocator.cc @@ -15,12 +15,12 @@ #include "util/allocator.h" +#include #include #include "util/basics.h" // MaybeCheckInitialized #include "hwy/aligned_allocator.h" #include "hwy/base.h" -#include "hwy/contrib/thread_pool/futex.h" #include "hwy/contrib/thread_pool/topology.h" #include "hwy/per_target.h" // VectorBytes @@ -46,13 +46,32 @@ #endif // GEMMA_BIND #if GEMMA_BIND && HWY_OS_LINUX +#include + +#include "hwy/contrib/thread_pool/futex.h" +#endif + +#if HWY_OS_LINUX +#include // sysconf +#if GEMMA_BIND // `move_pages` requires anonymous/private mappings, hence mmap. #include #include #include #include -#endif // GEMMA_BIND && HWY_OS_LINUX +#endif // GEMMA_BIND +#elif HWY_OS_WIN +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#ifndef VC_EXTRALEAN +#define VC_EXTRALEAN +#endif +#include +#elif HWY_OS_APPLE +#include +#endif // HWY_OS_LINUX namespace gcpp { namespace { @@ -68,14 +87,47 @@ size_t DetectLineBytes() { size_t DetectPageSize() { #if HWY_OS_LINUX - size_t page_bytes = static_cast(sysconf(_SC_PAGESIZE)); + const long ret = sysconf(_SC_PAGESIZE); // NOLINT(runtime/int) + HWY_ASSERT(ret != -1); + const size_t page_bytes = static_cast(ret); HWY_ASSERT(page_bytes <= (4 << 20)); return page_bytes; +#elif HWY_OS_WIN + SYSTEM_INFO info; + GetSystemInfo(&info); + return info.dwPageSize; +#elif HWY_OS_APPLE + uint64_t data = 0; + size_t len = sizeof(data); + HWY_ASSERT(sysctlbyname("vm.pagesize", &data, &len, nullptr, 0) == 0); + return data; #else return 0; #endif } +size_t DetectTotalMiB(size_t page_bytes) { + (void)page_bytes; +#if HWY_OS_LINUX + const long ret = sysconf(_SC_PHYS_PAGES); // NOLINT(runtime/int) + HWY_ASSERT(ret != -1); + return static_cast(ret) * page_bytes >> 20; +#elif HWY_OS_WIN + MEMORYSTATUSEX ms = {sizeof(MEMORYSTATUSEX)}; + HWY_ASSERT(GlobalMemoryStatusEx(&ms) != 0); + return ms.ullTotalPhys >> 20; +#elif HWY_OS_APPLE + int mib[2] = {CTL_HW, HW_MEMSIZE}; + uint64_t data = 0; + size_t len = sizeof(data); + HWY_ASSERT(sysctl(mib, sizeof(mib) / sizeof(*mib), &data, &len, nullptr, 0) == + 0); + return data >> 20; +#else +#error "Port" +#endif +} + } // namespace static size_t line_bytes_; @@ -305,4 +357,123 @@ bool Allocator::BindMemory(void* ptr, size_t bytes, size_t node) { bool Allocator::BindMemory(void*, size_t, size_t) { return false; } #endif // GEMMA_BIND && HWY_OS_LINUX +Allocator2::Allocator2(const BoundedTopology& topology, bool enable_bind) { + line_bytes_ = DetectLineBytes(); + vector_bytes_ = hwy::VectorBytes(); + step_bytes_ = HWY_MAX(line_bytes_, vector_bytes_); + base_page_bytes_ = DetectPageSize(); + quantum_bytes_ = step_bytes_; // may overwrite below + + const BoundedTopology::Cluster& cluster = topology.GetCluster(0, 0); + if (const hwy::Cache* caches = hwy::DataCaches()) { + l1_bytes_ = caches[1].size_kib << 10; + l2_bytes_ = caches[2].size_kib << 10; + l3_bytes_ = (caches[3].size_kib << 10) * caches[3].cores_sharing; + } else { // Unknown, make reasonable assumptions. + l1_bytes_ = 32 << 10; + l2_bytes_ = (cluster.PrivateKiB() ? cluster.PrivateKiB() : 256) << 10; + } + if (l3_bytes_ == 0) { + l3_bytes_ = (cluster.SharedKiB() ? cluster.SharedKiB() : 1024) << 10; + } + + total_mib_ = DetectTotalMiB(base_page_bytes_); + + // Prerequisites for binding: + // - supported by the OS (currently Linux only), + // - the page size is known and 'reasonably small', preferably less than + // a fraction of MatMul row/col sizes, which for 27B are up to 144 KiB. + // - we successfully detected topology and there are multiple nodes; + // - there are multiple packages, because we shard by package_idx. + if constexpr (GEMMA_BIND) { + if ((base_page_bytes_ != 0 && base_page_bytes_ <= 16 * 1024) && + topology.NumNodes() > 1 && topology.NumPackages() > 1) { + if (enable_bind) { + // Ensure pages meet the alignment requirements of `AllocBytes`. + HWY_ASSERT(base_page_bytes_ >= quantum_bytes_); + quantum_bytes_ = base_page_bytes_; + // Ensure MaxQuantum() is an upper bound. + HWY_ASSERT(MaxQuantum() >= Quantum()); + should_bind_ = true; + } else { + HWY_WARN( + "Multiple sockets but binding disabled. This reduces speed; " + "set or remove enable_bind to avoid this warning."); + } + } + } + + HWY_DASSERT(quantum_bytes_ % step_bytes_ == 0); + quantum_step_mask_ = quantum_bytes_ / step_bytes_ - 1; +} + +size_t Allocator2::FreeMiB() const { +#if HWY_OS_LINUX + const long ret = sysconf(_SC_AVPHYS_PAGES); // NOLINT(runtime/int) + HWY_ASSERT(ret != -1); + return static_cast(ret) * base_page_bytes_ >> 20; +#elif HWY_OS_WIN + MEMORYSTATUSEX ms = {sizeof(MEMORYSTATUSEX)}; + HWY_ASSERT(GlobalMemoryStatusEx(&ms) != 0); + return ms.ullAvailVirtual >> 20; +#elif HWY_OS_APPLE + uint64_t free = 0, inactive = 0, speculative = 0; + size_t len = sizeof(free); + sysctlbyname("vm.page_free_count", &free, &len, nullptr, 0); + sysctlbyname("vm.page_inactive_count", &inactive, &len, nullptr, 0); + sysctlbyname("vm.page_speculative_count", &speculative, &len, nullptr, 0); + return (free + inactive + speculative) * base_page_bytes_ >> 20; +#else +#error "Port" +#endif +} + +Allocator2::PtrAndDeleter Allocator2::AllocBytes(size_t bytes) const { + // If we are not binding, the Highway allocator is cheaper than `mmap`, and + // defends against 2K aliasing. + if (!should_bind_) { + // Perf warning if Highway's alignment is less than we want. + if (HWY_ALIGNMENT < QuantumBytes()) { + HWY_WARN( + "HWY_ALIGNMENT %d < QuantumBytes %zu: either vector or cache lines " + "are huge, enable GEMMA_BIND to avoid this warning.", + HWY_ALIGNMENT, QuantumBytes()); + } + auto p = hwy::AllocateAligned(bytes); + // The `hwy::AlignedFreeUniquePtr` deleter is unfortunately specific to the + // alignment scheme in aligned_allocator.cc and does not work for + // already-aligned pointers as returned by `mmap`, hence we wrap the Highway + // pointer in our own deleter. + return PtrAndDeleter{p.release(), DeleterFunc2([](void* ptr) { + hwy::FreeAlignedBytes(ptr, nullptr, nullptr); + })}; + } + + // Binding, or large vector/cache line size: use platform-specific allocator. + +#if HWY_OS_LINUX && !defined(__ANDROID_API__) + // `move_pages` is documented to require an anonymous/private mapping or + // `MAP_SHARED`. A normal allocation might not suffice, so we use `mmap`. + // `Init` verified that the page size is a multiple of `QuantumBytes()`. + const int prot = PROT_READ | PROT_WRITE; + const int flags = MAP_ANONYMOUS | MAP_PRIVATE; + const int fd = -1; + void* p = mmap(0, bytes, prot, flags, fd, off_t{0}); + if (p == MAP_FAILED) p = nullptr; + return PtrAndDeleter{p, DeleterFunc2([bytes](void* ptr) { + HWY_ASSERT(munmap(ptr, bytes) == 0); + })}; +#elif HWY_OS_WIN + const size_t alignment = HWY_MAX(vector_bytes_, line_bytes_); + return PtrAndDeleter{_aligned_malloc(bytes, alignment), + DeleterFunc2([](void* ptr) { _aligned_free(ptr); })}; +#else + return PtrAndDeleter{nullptr, DeleterFunc2()}; +#endif +} + +bool Allocator2::BindMemory(void* ptr, size_t bytes, size_t node) const { + return Allocator::BindMemory(ptr, bytes, node); +} + } // namespace gcpp diff --git a/util/allocator.h b/util/allocator.h index e54fdc7..b5d59bb 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -21,6 +21,7 @@ #include #include +#include // IWYU pragma: begin_exports #include // std::unique_ptr @@ -330,6 +331,156 @@ RowPtr RowPtrFromBatch(RowVectorBatch& row_vectors) { return RowPtr(row_vectors.All(), row_vectors.Cols(), row_vectors.Stride()); } +// Custom deleter for types without a dtor, but where the deallocation requires +// state, e.g. a lambda with *by-value* capture. +class DeleterFunc2 { + public: + // `MatOwnerT` requires this to be default-constructible. + DeleterFunc2() = default; + + template + DeleterFunc2(const Closure& free_closure) : free_func_(free_closure) {} + + template + void operator()(T* p) const { + free_func_(const_cast*>(p)); + } + + private: + std::function free_func_; +}; + +// Wrapper that also calls the destructor for each element being deallocated. +class DeleterDtor2 { + public: + DeleterDtor2() {} + DeleterDtor2(size_t num, DeleterFunc2 free) : num_(num), free_(free) {} + + template + void operator()(T* p) const { + for (size_t i = 0; i < num_; ++i) { + p[i].~T(); + } + free_(p); + } + + private: + size_t num_; + DeleterFunc2 free_; +}; + +// Unique (move-only) pointer to aligned POD T, which can be an array or class. +template +using AlignedPtr2 = std::unique_ptr; +// Unique (move-only) pointer to an aligned array of non-POD T. +template +using AlignedClassPtr2 = std::unique_ptr; + +// Both allocation, binding, and row accessors depend on the sizes of memory +// pages and cache lines. To avoid having to pass `Allocator2&` everywhere, we +// wrap this in a singleton. A monostate requires explicit initialization, +// which we prefer to avoid because there are many main() functions. +class Allocator2 { + public: + // Must be called at least once before any other function. Not thread-safe, + // hence only call this from the main thread. + // TODO: remove enable_bind once Gemma tensors support binding. + Allocator2(const BoundedTopology& topology, bool enable_bind); + + // Bytes per cache line, or a reasonable guess if unknown. Used to choose + // ranges such that there will be no false sharing. + size_t LineBytes() const { return line_bytes_; } + // Bytes per full vector. Used to compute loop steps. + size_t VectorBytes() const { return vector_bytes_; } + // Work granularity that avoids false sharing and partial vectors. + // = HWY_MAX(LineBytes(), VectorBytes()) + size_t StepBytes() const { return step_bytes_; } + // File size multiple required for memory mapping. + size_t BasePageBytes() const { return base_page_bytes_; } + // Either StepBytes or BasePageBytes if NUMA. + size_t QuantumBytes() const { return quantum_bytes_; } + template + size_t Quantum() const { + return QuantumBytes() / sizeof(T); + } + // Upper bound on `Quantum()`, for stack allocations. + template + static constexpr size_t MaxQuantum() { + return 4096 / sizeof(T); + } + // = QuantumBytes() / StepBytes() - 1 + size_t QuantumStepMask() const { return quantum_step_mask_; } + + // L1 and L2 are typically per core. + size_t L1Bytes() const { return l1_bytes_; } + size_t L2Bytes() const { return l2_bytes_; } + // Clusters often share an L3. We return the total size per package. + size_t L3Bytes() const { return l3_bytes_; } + + size_t TotalMiB() const { return total_mib_; } + size_t FreeMiB() const; + + // Returns pointer aligned to `QuantumBytes()`. + template + AlignedPtr2 Alloc(size_t num) const { + const size_t bytes = num * sizeof(T); + // Fail if the `bytes = num * sizeof(T)` computation overflowed. + HWY_ASSERT(bytes / sizeof(T) == num); + + PtrAndDeleter pd = AllocBytes(bytes); + return AlignedPtr2(static_cast(pd.p), pd.deleter); + } + + // Same as Alloc, but calls constructor(s) with `args` and the deleter will + // call destructor(s). + template + AlignedClassPtr2 AllocClasses(size_t num, Args&&... args) const { + const size_t bytes = num * sizeof(T); + // Fail if the `bytes = num * sizeof(T)` computation overflowed. + HWY_ASSERT(bytes / sizeof(T) == num); + + PtrAndDeleter pd = AllocBytes(bytes); + T* p = static_cast(pd.p); + for (size_t i = 0; i < num; ++i) { + new (p + i) T(std::forward(args)...); + } + return AlignedClassPtr2(p, DeleterDtor2(num, pd.deleter)); + } + + // Returns whether `BindMemory` can/should be called, i.e. we have page-level + // control over memory placement and multiple packages and NUMA nodes. + bool ShouldBind() const { return should_bind_; } + + // Attempts to move(!) `[p, p + bytes)` to the given NUMA node, which is + // typically `BoundedTopology::GetCluster(package_idx, cluster_idx).node`. + // Writes zeros to SOME of the memory. Only call if `ShouldBind()`. + // `p` and `bytes` must be multiples of `QuantumBytes()`. + bool BindMemory(void* p, size_t bytes, size_t node) const; + + private: + // Type-erased so this can be implemented in allocator.cc. + struct PtrAndDeleter { + void* p; + DeleterFunc2 deleter; + }; + PtrAndDeleter AllocBytes(size_t bytes) const; + + size_t line_bytes_; + size_t vector_bytes_; + size_t step_bytes_; + size_t base_page_bytes_; + size_t quantum_bytes_; + size_t quantum_step_mask_; + + size_t l1_bytes_ = 0; + size_t l2_bytes_ = 0; + size_t l3_bytes_ = 0; + + size_t total_mib_; + + bool should_bind_ = false; +}; + } // namespace gcpp #endif // THIRD_PARTY_GEMMA_CPP_UTIL_ALLOCATOR_H_ diff --git a/util/threading_context.cc b/util/threading_context.cc new file mode 100644 index 0000000..9065335 --- /dev/null +++ b/util/threading_context.cc @@ -0,0 +1,63 @@ +// Copyright 2025 Google LLC +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "util/threading_context.h" + +#include +#include // NOLINT + +namespace gcpp { + +static ThreadingArgs s_args; +// Cannot use magic static because that does not support `Invalidate`, hence +// allocate manually. +static std::unique_ptr s_ctx; +static std::mutex s_ctx_mutex; + +/*static*/ void ThreadingContext2::SetArgs(const ThreadingArgs& args) { + s_ctx_mutex.lock(); + HWY_ASSERT(!s_ctx); // Ensure not already initialized, else this is too late. + s_args = args; + s_ctx_mutex.unlock(); +} + +/*static*/ ThreadingContext2& ThreadingContext2::Get() { + // We do not bother with double-checked locking because it requires an + // atomic pointer, but we prefer to use unique_ptr for simplicity. Also, + // callers can cache the result and call less often. + s_ctx_mutex.lock(); + if (HWY_UNLIKELY(!s_ctx)) { + s_ctx = std::make_unique(PrivateToken()); + } + s_ctx_mutex.unlock(); + return *s_ctx; +} + +/*static*/ void ThreadingContext2::ThreadHostileInvalidate() { + // Deliberately avoid taking the lock so that tsan can warn if this is + // called concurrently with other calls to `Get`. + s_ctx.reset(); +} + +// WARNING: called with `s_ctx_mutex` held. Calling `SetArgs` or `Get` would +// deadlock. +ThreadingContext2::ThreadingContext2(ThreadingContext2::PrivateToken) + : topology(BoundedSlice(s_args.skip_packages, s_args.max_packages), + BoundedSlice(s_args.skip_clusters, s_args.max_clusters), + BoundedSlice(s_args.skip_lps, s_args.max_lps)), + allocator(topology, s_args.bind != Tristate::kFalse), + pools(topology, allocator, s_args.max_threads, s_args.pin) {} + +} // namespace gcpp diff --git a/util/threading_context.h b/util/threading_context.h new file mode 100644 index 0000000..7430f16 --- /dev/null +++ b/util/threading_context.h @@ -0,0 +1,128 @@ +// Copyright 2025 Google LLC +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef THIRD_PARTY_GEMMA_CPP_UTIL_THREADING_CONTEXT_H_ +#define THIRD_PARTY_GEMMA_CPP_UTIL_THREADING_CONTEXT_H_ + +// Separate component to ensure `threading.cc` does not have access to +// `ThreadingContext`, because that could deadlock. + +#include +#include + +// IWYU pragma: begin_exports +#include "util/allocator.h" +#include "util/args.h" +#include "util/basics.h" // Tristate +#include "util/threading.h" +#include "util/topology.h" +// IWYU pragma: end_exports + +namespace gcpp { + +// Optional arguments for `ThreadingContext` from the command line. +class ThreadingArgs : public ArgsBase { + public: + ThreadingArgs(int argc, char* argv[]) { InitAndParse(argc, argv); } + ThreadingArgs() { Init(); }; + + // For BoundedTopology: + size_t skip_packages; + size_t max_packages; + size_t skip_clusters; + size_t max_clusters; + size_t skip_lps; + size_t max_lps; + + Tristate bind; + + // For NestedPools: + size_t max_threads; // divided among the detected clusters + Tristate pin; // pin threads? + Tristate spin; // use spin waits? + + template + void ForEach(const Visitor& visitor) { + // These can be used to partition CPU sockets/packages and their + // clusters/CCXs across several program instances. The default is to use + // all available resources. + visitor(skip_packages, "skip_packages", size_t{0}, + "Index of the first socket to use; default 0 = unlimited.", 2); + visitor(max_packages, "max_packages", size_t{0}, + "Maximum number of sockets to use; default 0 = unlimited.", 2); + visitor(skip_clusters, "skip_clusters", size_t{0}, + "Index of the first CCX to use; default 0 = unlimited.", 2); + visitor(max_clusters, "max_clusters", size_t{0}, + "Maximum number of CCXs to use; default 0 = unlimited.", 2); + // These are only used when CPU topology is unknown. + visitor(skip_lps, "skip_lps", size_t{0}, + "Index of the first LP to use; default 0 = unlimited.", 2); + visitor(max_lps, "max_lps", size_t{0}, + "Maximum number of LPs to use; default 0 = unlimited.", 2); + + // The exact meaning is more subtle: see the comment at NestedPools ctor. + visitor(max_threads, "num_threads", size_t{0}, + "Maximum number of threads to use; default 0 = unlimited.", 2); + visitor(pin, "pin", Tristate::kDefault, + "Pin threads? -1 = auto, 0 = no, 1 = yes.", 2); + visitor(spin, "spin", Tristate::kDefault, + "Use spin waits? -1 = auto, 0 = no, 1 = yes.", 2); + + visitor(bind, "bind", Tristate::kDefault, + "Bind memory to sockets? -1 = auto, 0 = no, 1 = yes.", 2); + } +}; + +// Lazily-initialized singleton with support for passing in arguments from +// `ThreadingArgs` and re-initializing with different arguments. +class ThreadingContext2 { + struct PrivateToken {}; // avoids constructing directly + + public: + // If not called, default arguments are used when `Get` initializes the + // singleton. Must not be called after `Get`, unless after a call to + // `ThreadHostileInvalidate`, because otherwise initialization already + // happened and the arguments would have no effect. Thread-safe, though this + // is expected to be called early in the program, before threading starts. + static void SetArgs(const ThreadingArgs& args); + + // Returns a reference to the singleton after initializing it if necessary. + // When initializing, uses the args passed to `SetArgs`, or defaults. + // + // It is safe to call this concurrently with other `Get`, but not with + // `SetArgs`, because that will warn if called after this, nor with + // `ThreadHostileInvalidate`, because that will invalidate the reference which + // callers of this may still be using. Such usage only occurs in tests, + // hence we prefer not to pull `std::shared_ptr` into the interface. + // + // To reduce overhead, callers should cache the result and call less often. + static ThreadingContext2& Get(); + + // Invalidates the singleton before or after a call to `Get`. This allows + // changing the arguments between tests. Callers must again call `Get` + // afterwards to obtain an instance. WARNING: must not be called concurrently + // with other calls to `Get` and usages of its return value. + static void ThreadHostileInvalidate(); + + explicit ThreadingContext2(PrivateToken); // only called via `Get`. + + BoundedTopology topology; + Allocator2 allocator; + NestedPools pools; +}; + +} // namespace gcpp + +#endif // THIRD_PARTY_GEMMA_CPP_UTIL_THREADING_CONTEXT_H_ diff --git a/util/topology.cc b/util/topology.cc index 239be4d..4eb8b33 100644 --- a/util/topology.cc +++ b/util/topology.cc @@ -138,13 +138,13 @@ BoundedTopology::Cluster::Cluster(const LPS& enabled_lps, } if (HWY_UNLIKELY(private_kib_ != tcluster.private_kib)) { warned = true; - HWY_WARN("lp %zu private_kib %zu != cluster %zu.", lp, private_kib_, - tcluster.private_kib); + HWY_WARN("lp %zu private_kib %zu != cluster %u.", lp, private_kib_, + static_cast(tcluster.private_kib)); } if (HWY_UNLIKELY(shared_kib_ != tcluster.shared_kib)) { warned = true; - HWY_WARN("lp %zu shared_kib %zu != cluster %zu.", lp, shared_kib_, - tcluster.shared_kib); + HWY_WARN("lp %zu shared_kib %zu != cluster %u.", lp, shared_kib_, + static_cast(tcluster.shared_kib)); } } // !warned }