From 1b72c223450a9e009e2501fd35de46374d894634 Mon Sep 17 00:00:00 2001 From: Jan Wassenberg Date: Fri, 14 Mar 2025 10:18:11 -0700 Subject: [PATCH] Refactor Gemma ctor and improve pool NUMA support Gemma receives a MatMulEnv arg, with comment on lifetime Split threading into topology so the latter can be used in allocator Add AllocClasses() for non-POD (ThreadPool) Support binding pool to NUMA node Update threading_test with latency measurements Also update Highway version. PiperOrigin-RevId: 736904748 --- BUILD.bazel | 73 +++-- CMakeLists.txt | 4 +- MODULE.bazel | 2 +- backprop/backward_test.cc | 25 +- backprop/optimize_test.cc | 9 +- compression/blob_compare.cc | 5 +- evals/benchmark_helper.cc | 11 +- evals/benchmark_helper.h | 16 +- examples/hello_world/BUILD.bazel | 1 - examples/hello_world/CMakeLists.txt | 2 +- examples/hello_world/run.cc | 7 +- examples/simplified_gemma/BUILD.bazel | 5 +- examples/simplified_gemma/CMakeLists.txt | 2 +- examples/simplified_gemma/gemma.hpp | 18 +- gemma/gemma.cc | 12 +- gemma/gemma.h | 16 +- gemma/run.cc | 14 +- ops/bench_matmul.cc | 12 +- ops/dot_test.cc | 11 +- ops/matmul.cc | 3 +- ops/matmul.h | 11 +- ops/matmul_test.cc | 19 +- ops/ops_test.cc | 6 +- util/allocator.cc | 16 +- util/allocator.h | 59 +++- util/app.h | 29 +- util/threading.cc | 362 +++-------------------- util/threading.h | 195 ++---------- util/threading_test.cc | 159 +++++++--- util/topology.cc | 336 +++++++++++++++++++++ util/topology.h | 177 +++++++++++ 31 files changed, 920 insertions(+), 697 deletions(-) create mode 100644 util/topology.cc create mode 100644 util/topology.h diff --git a/BUILD.bazel b/BUILD.bazel index a3c956e..8c32631 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -29,15 +29,13 @@ cc_library( ], ) +# Split from :threading to break a circular dependency with :allocator. cc_library( - name = "threading", - srcs = ["util/threading.cc"], - hdrs = ["util/threading.h"], + name = "topology", + srcs = ["util/topology.cc"], + hdrs = ["util/topology.h"], deps = [ - ":basics", - # Placeholder for container detection, do not remove "@highway//:hwy", - "@highway//:thread_pool", "@highway//:topology", ], ) @@ -48,9 +46,44 @@ cc_library( hdrs = ["util/allocator.h"], deps = [ ":basics", - ":threading", + ":topology", "@highway//:hwy", "@highway//:thread_pool", + "@highway//:topology", + ], +) + +cc_library( + name = "threading", + srcs = ["util/threading.cc"], + hdrs = ["util/threading.h"], + deps = [ + ":allocator", + ":basics", + ":topology", + # Placeholder for container detection, do not remove + "@highway//:hwy", + "@highway//:thread_pool", + "@highway//:topology", + ], +) + +cc_test( + name = "threading_test", + srcs = ["util/threading_test.cc"], + deps = [ + ":allocator", + ":basics", + ":threading", + "@googletest//:gtest_main", + "@highway//:auto_tune", + "@highway//:hwy", + "@highway//:hwy_test_util", + "@highway//:nanobenchmark", + "@highway//:robust_statistics", + "@highway//:stats", + "@highway//:thread_pool", + "@highway//:timer", ], ) @@ -64,19 +97,6 @@ cc_library( ], ) -cc_test( - name = "threading_test", - srcs = ["util/threading_test.cc"], - deps = [ - ":threading", - "@googletest//:gtest_main", - "@highway//:hwy", - "@highway//:hwy_test_util", - "@highway//:nanobenchmark", - "@highway//:thread_pool", - ], -) - # For building all tests in one command, so we can test several. test_suite( name = "ops_tests", @@ -104,6 +124,7 @@ cc_library( ":allocator", ":basics", ":threading", + ":topology", "//compression:compress", "@highway//:algo", "@highway//:bit_set", @@ -113,7 +134,6 @@ cc_library( "@highway//:nanobenchmark", "@highway//:profiler", "@highway//:thread_pool", - "@highway//:topology", "@highway//hwy/contrib/sort:vqsort", ], ) @@ -128,11 +148,11 @@ cc_test( tags = ["ops_tests"], deps = [ ":allocator", + ":app", ":ops", ":test_util", ":threading", "@googletest//:gtest_main", # buildcleaner: keep - "//:app", "//compression:compress", "//compression:test_util", "@highway//:hwy", @@ -154,11 +174,12 @@ cc_test( tags = ["ops_tests"], deps = [ ":allocator", + ":app", ":common", ":ops", ":test_util", + ":threading", "@googletest//:gtest_main", # buildcleaner: keep - "//:app", "//compression:compress", "@highway//:hwy", "@highway//:hwy_test_util", @@ -405,6 +426,7 @@ cc_library( ":cross_entropy", ":gemma_lib", ":kv_cache", + ":ops", ":threading", # Placeholder for internal dep, do not remove., "@google_benchmark//:benchmark", @@ -464,13 +486,13 @@ cc_binary( ":benchmark_helper", ":common", ":gemma_lib", + ":ops", ":threading", # Placeholder for internal dep, do not remove., "//compression:sfp", "//paligemma:image", "@highway//:hwy", "@highway//:profiler", - "@highway//:thread_pool", ], ) @@ -634,13 +656,12 @@ cc_test( ":backprop", ":backprop_scalar", ":common", - ":gemma_lib", ":ops", ":prompt", ":sampler", + ":threading", ":weights", "@googletest//:gtest_main", - "//:threading", "//compression:compress", "@highway//:hwy", "@highway//:hwy_test_util", diff --git a/CMakeLists.txt b/CMakeLists.txt index 46bac38..1737c2d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) -FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995 EXCLUDE_FROM_ALL) +FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06 EXCLUDE_FROM_ALL) FetchContent_MakeAvailable(highway) ## Note: absl needs to be installed by sentencepiece. This will only happen if @@ -108,6 +108,8 @@ set(SOURCES util/test_util.h util/threading.cc util/threading.h + util/topology.cc + util/topology.h ) if(NOT CMAKE_BUILD_TYPE) diff --git a/MODULE.bazel b/MODULE.bazel index 6bd5a0a..77690fa 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -18,7 +18,7 @@ bazel_dep(name = "google_benchmark", version = "1.8.5") # Require a more recent version. git_override( module_name = "highway", - commit = "f2209b911c74019e85d0b7a7a2833c9a2e1b7995", + commit = "c5bebf84ad01edec97e336f5c97ca4e0df6b4d06", remote = "https://github.com/google/highway", ) diff --git a/backprop/backward_test.cc b/backprop/backward_test.cc index 974ea6e..f1c97b2 100644 --- a/backprop/backward_test.cc +++ b/backprop/backward_test.cc @@ -35,7 +35,6 @@ #include "ops/ops.h" #include "util/threading.h" #include "hwy/base.h" -#include "hwy/contrib/thread_pool/thread_pool.h" // clang-format off #undef HWY_TARGET_INCLUDE @@ -59,9 +58,9 @@ void TestMatMulVJP() { static const size_t kRows = 8; static const size_t kCols = 64; static const size_t kTokens = 5; - gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1), - BoundedSlice(0, 8)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8)); + Allocator::Init(topology); + gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse); std::mt19937 gen(42); MatStorageT weights("weights", kRows, kCols); MatStorageT x("x", kTokens, kCols); @@ -105,9 +104,9 @@ void TestMultiHeadMatMulVJP() { static const size_t kCols = 16; static const size_t kHeads = 4; static const size_t kTokens = 3; - gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1), - BoundedSlice(0, 8)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8)); + Allocator::Init(topology); + gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse); std::mt19937 gen(42); MatStorageT weights("weights", kRows, kCols * kHeads); MatStorageT x("x", kTokens, kCols * kHeads); @@ -150,9 +149,9 @@ void TestMultiHeadMatMulVJP() { void TestRMSNormVJP() { static const size_t K = 2; static const size_t N = 64; - gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1), - BoundedSlice(0, 8)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8)); + Allocator::Init(topology); + gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse); std::mt19937 gen(42); MatStorageT weights("weights", N, 1); MatStorageT x("x", K, N); @@ -216,9 +215,9 @@ static ModelConfig TestConfig() { void TestEndToEnd() { std::mt19937 gen(42); - gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1), - BoundedSlice(0, 1)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1)); + Allocator::Init(topology); + gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse); ModelConfig config = TestConfig(); WeightsWrapper weights(config); WeightsWrapper grad(config); diff --git a/backprop/optimize_test.cc b/backprop/optimize_test.cc index 6d3de50..6f08bf0 100644 --- a/backprop/optimize_test.cc +++ b/backprop/optimize_test.cc @@ -41,9 +41,10 @@ namespace gcpp { TEST(OptimizeTest, GradientDescent) { - NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1), - BoundedSlice(0, 1)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1)); + Allocator::Init(topology); + NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse); + MatMulEnv env(topology, pools); hwy::ThreadPool& pool = pools.Pool(); std::mt19937 gen(42); @@ -66,7 +67,7 @@ TEST(OptimizeTest, GradientDescent) { config.layer_configs[0].qkv_dim, config.layer_configs[0].post_qk == PostQKType::HalfRope); - Gemma gemma(GemmaTokenizer(), info, pools); + Gemma gemma(GemmaTokenizer(), info, env); const auto generate = [&](const std::vector& prompt) { std::vector reply; diff --git a/compression/blob_compare.cc b/compression/blob_compare.cc index 0ff6565..c0fe63c 100644 --- a/compression/blob_compare.cc +++ b/compression/blob_compare.cc @@ -202,8 +202,9 @@ void ReadAndCompareBlobs(const char* path1, const char* path2) { if (!CompareKeys(reader1, reader2)) return; // Single allocation, avoid initializing the memory. - NestedPools pools(0); - Allocator::Init(pools.Topology()); + BoundedTopology topology; + Allocator::Init(topology); + NestedPools pools(topology); const size_t total_bytes = TotalBytes(reader1) + TotalBytes(reader2); BytePtr all_blobs = hwy::AllocateAligned(total_bytes); size_t pos = 0; diff --git a/evals/benchmark_helper.cc b/evals/benchmark_helper.cc index 76251f5..2daebdf 100644 --- a/evals/benchmark_helper.cc +++ b/evals/benchmark_helper.cc @@ -56,8 +56,9 @@ void InitGenerator(const InferenceArgs& inference, std::mt19937& gen) { GemmaEnv::GemmaEnv(const LoaderArgs& loader, const InferenceArgs& inference, const AppArgs& app) - : pools_(CreatePools(app)) { - Allocator::Init(pools_.Topology()); + : topology_(CreateTopology(app)), + pools_(CreatePools(topology_, app)), + env_(topology_, pools_) { InferenceArgs mutable_inference = inference; AbortIfInvalidArgs(mutable_inference); LoaderArgs mutable_loader = loader; @@ -66,7 +67,7 @@ GemmaEnv::GemmaEnv(const LoaderArgs& loader, const InferenceArgs& inference, fprintf(stderr, "Skipping model load because: %s\n", err); } else { fprintf(stderr, "Loading model...\n"); - model_ = AllocateGemma(mutable_loader, pools_); + model_ = AllocateGemma(mutable_loader, env_); // Only allocate one for starters because GenerateBatch might not be called. kv_caches_.resize(1); kv_caches_[0] = KVCache::Create(model_->GetModelConfig(), @@ -236,7 +237,7 @@ std::string CacheString() { } void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app, - NestedPools& pools) { + const BoundedTopology& topology, NestedPools& pools) { loader.Print(app.verbosity); inference.Print(app.verbosity); app.Print(app.verbosity); @@ -255,7 +256,7 @@ void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app, "Compiled config : %s\n" "Weight Type : %s\n" "EmbedderInput Type : %s\n", - dt, cpu100, pools.TopologyString(), pools.PinString(), + dt, cpu100, topology.TopologyString(), pools.PinString(), CacheString().c_str(), hwy::TargetName(hwy::DispatchedTarget()), hwy::VectorBytes() * 8, CompiledConfig(), StringFromType(loader.Info().weight), TypeName()); diff --git a/evals/benchmark_helper.h b/evals/benchmark_helper.h index 7e7f1bf..f6e32c0 100644 --- a/evals/benchmark_helper.h +++ b/evals/benchmark_helper.h @@ -24,6 +24,7 @@ #include #include "gemma/gemma.h" +#include "ops/matmul.h" #include "util/app.h" #include "util/threading.h" #include "hwy/base.h" @@ -105,15 +106,12 @@ class GemmaEnv { KVCache& MutableKVCache() { return kv_caches_[0]; } private: - // Thread pool for running inference. - NestedPools pools_; - // Random number generator. - std::mt19937 gen_; - // The model to run inference on. + BoundedTopology topology_; + NestedPools pools_; // Thread pool. + MatMulEnv env_; + std::mt19937 gen_; // Random number generator. std::unique_ptr model_; - // KV caches, same number as query batch. - std::vector kv_caches_; - // Runtime config for inference. + std::vector kv_caches_; // Same number as query batch. RuntimeConfig runtime_config_; }; @@ -121,7 +119,7 @@ class GemmaEnv { void LogSpeedStats(double time_start, size_t total_tokens); void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app, - NestedPools& pools); + const BoundedTopology& topology, NestedPools& pools); void ShowHelp(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app); } // namespace gcpp diff --git a/examples/hello_world/BUILD.bazel b/examples/hello_world/BUILD.bazel index 52af610..3160103 100644 --- a/examples/hello_world/BUILD.bazel +++ b/examples/hello_world/BUILD.bazel @@ -13,7 +13,6 @@ cc_binary( # Placeholder for internal dep, do not remove., "//:app", "//:args", - "//:common", "//:gemma_lib", "//:threading", "//:tokenizer", diff --git a/examples/hello_world/CMakeLists.txt b/examples/hello_world/CMakeLists.txt index 030b2ba..2eb39c6 100644 --- a/examples/hello_world/CMakeLists.txt +++ b/examples/hello_world/CMakeLists.txt @@ -17,7 +17,7 @@ project(hello_world) set(CMAKE_CXX_STANDARD_REQUIRED ON) include(FetchContent) -FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995) +FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06) FetchContent_MakeAvailable(highway) FetchContent_Declare(sentencepiece GIT_REPOSITORY https://github.com/google/sentencepiece GIT_TAG 53de76561cfc149d3c01037f0595669ad32a5e7c) FetchContent_MakeAvailable(sentencepiece) diff --git a/examples/hello_world/run.cc b/examples/hello_world/run.cc index 3951350..9ef58ff 100644 --- a/examples/hello_world/run.cc +++ b/examples/hello_world/run.cc @@ -58,9 +58,10 @@ int main(int argc, char** argv) { } // Instantiate model and KV Cache - gcpp::NestedPools pools = gcpp::CreatePools(app); - gcpp::Allocator::Init(pools.Topology()); - gcpp::Gemma model = gcpp::CreateGemma(loader, pools); + gcpp::BoundedTopology topology(gcpp::CreateTopology(app)); + gcpp::NestedPools pools = gcpp::CreatePools(topology, app); + gcpp::MatMulEnv env(topology, pools); + gcpp::Gemma model = gcpp::CreateGemma(loader, env); gcpp::KVCache kv_cache = gcpp::KVCache::Create(model.GetModelConfig(), inference.prefill_tbatch_size); diff --git a/examples/simplified_gemma/BUILD.bazel b/examples/simplified_gemma/BUILD.bazel index 2ae7861..bedb322 100644 --- a/examples/simplified_gemma/BUILD.bazel +++ b/examples/simplified_gemma/BUILD.bazel @@ -11,13 +11,11 @@ cc_library( hdrs = ["gemma.hpp"], deps = [ "//:app", - "//:args", - "//:common", "//:gemma_lib", + "//:ops", "//:threading", "//:tokenizer", "@highway//:hwy", - "@highway//:thread_pool", ], ) @@ -31,6 +29,7 @@ cc_binary( "//:args", "//:common", "//:gemma_lib", + "//:ops", "//:threading", "//:tokenizer", "@highway//:hwy", diff --git a/examples/simplified_gemma/CMakeLists.txt b/examples/simplified_gemma/CMakeLists.txt index 609459e..e7e6653 100644 --- a/examples/simplified_gemma/CMakeLists.txt +++ b/examples/simplified_gemma/CMakeLists.txt @@ -17,7 +17,7 @@ project(simplified_gemma) set(CMAKE_CXX_STANDARD_REQUIRED ON) include(FetchContent) -FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995) +FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06) FetchContent_MakeAvailable(highway) FetchContent_Declare(sentencepiece GIT_REPOSITORY https://github.com/google/sentencepiece GIT_TAG 53de76561cfc149d3c01037f0595669ad32a5e7c) FetchContent_MakeAvailable(sentencepiece) diff --git a/examples/simplified_gemma/gemma.hpp b/examples/simplified_gemma/gemma.hpp index 84283a3..bbbb86a 100644 --- a/examples/simplified_gemma/gemma.hpp +++ b/examples/simplified_gemma/gemma.hpp @@ -25,10 +25,10 @@ #include "third_party/gemma_cpp/gemma/gemma.h" #include "third_party/gemma_cpp/gemma/tokenizer.h" +#include "third_party/gemma_cpp/ops/matmul.h" #include "third_party/gemma_cpp/util/app.h" // LoaderArgs #include "third_party/gemma_cpp/util/threading.h" #include "third_party/highway/hwy/base.h" -#include "third_party/highway/hwy/contrib/thread_pool/thread_pool.h" class SimplifiedGemma { public: @@ -38,8 +38,10 @@ class SimplifiedGemma { : loader_(loader), inference_(inference), app_(app), - pools_(gcpp::CreatePools(app_)), - model_(gcpp::CreateGemma(loader_, pools_)) { + topology_(gcpp::CreateTopology(app_)), + pools_(gcpp::CreatePools(topology_, app_)), + env_(topology_, pools_), + model_(gcpp::CreateGemma(loader_, env_)) { Init(); } @@ -47,14 +49,14 @@ class SimplifiedGemma { : loader_(argc, argv, /*validate=*/true), inference_(argc, argv), app_(argc, argv), - pools_(gcpp::CreatePools(app_)), - model_(gcpp::CreateGemma(loader_, pools_)) { + topology_(gcpp::CreateTopology(app_)), + pools_(gcpp::CreatePools(topology_, app_)), + env_(topology_, pools_), + model_(gcpp::CreateGemma(loader_, env_)) { Init(); } void Init() { - gcpp::Allocator::Init(pools_.Topology()); - // Instantiate model and KV Cache kv_cache_ = gcpp::KVCache::Create(model_.GetModelConfig(), inference_.prefill_tbatch_size); @@ -106,7 +108,9 @@ class SimplifiedGemma { gcpp::LoaderArgs loader_; gcpp::InferenceArgs inference_; gcpp::AppArgs app_; + gcpp::BoundedTopology topology_; gcpp::NestedPools pools_; + gcpp::MatMulEnv env_; gcpp::Gemma model_; gcpp::KVCache kv_cache_; std::mt19937 gen_; diff --git a/gemma/gemma.cc b/gemma/gemma.cc index 20b1c75..bfc6534 100644 --- a/gemma/gemma.cc +++ b/gemma/gemma.cc @@ -34,29 +34,27 @@ #include "ops/ops-inl.h" #include "paligemma/image.h" #include "util/threading.h" -#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/highway.h" namespace gcpp { Gemma::Gemma(const Path& tokenizer_path, const Path& weights, - const ModelInfo& info, NestedPools& pools) - : env_(pools), tokenizer_(tokenizer_path) { + const ModelInfo& info, MatMulEnv& env) + : env_(env), tokenizer_(tokenizer_path) { model_.Load(weights, info.model, info.weight, info.wrapping, env_.parallel.Pools().Pool(0), /*tokenizer_proto=*/nullptr); } -Gemma::Gemma(const Path& weights, NestedPools& pools) : env_(pools) { +Gemma::Gemma(const Path& weights, MatMulEnv& env) : env_(env) { std::string tokenizer_proto; model_.Load(weights, Model::UNKNOWN, Type::kUnknown, PromptWrapping::GEMMA_IT, env_.parallel.Pools().Pool(0), &tokenizer_proto); tokenizer_.Deserialize(tokenizer_proto); } -Gemma::Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, - NestedPools& pools) - : env_(pools), tokenizer_(std::move(tokenizer)) { +Gemma::Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, MatMulEnv& env) + : env_(env), tokenizer_(std::move(tokenizer)) { HWY_ASSERT(info.weight == Type::kF32); model_.Allocate(info.model, info.weight, env_.parallel.Pools().Pool(0)); } diff --git a/gemma/gemma.h b/gemma/gemma.h index d7be609..ccda69c 100644 --- a/gemma/gemma.h +++ b/gemma/gemma.h @@ -33,8 +33,6 @@ #include "paligemma/image.h" #include "util/allocator.h" // RowVectorBatch #include "util/basics.h" // TokenAndProb -#include "util/threading.h" -#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/timer.h" // IWYU pragma: end_exports #include "hwy/aligned_allocator.h" // Span @@ -198,12 +196,14 @@ struct TimingInfo { class Gemma { public: // Reads old format weights file and tokenizer file. + // `env` must remain valid for the lifetime of this Gemma. Gemma(const Path& tokenizer_path, const Path& weights, const ModelInfo& info, - NestedPools& pools); + MatMulEnv& env); // Reads new format weights file that contains everything in a single file. - Gemma(const Path& weights, NestedPools& pools); + // `env` must remain valid for the lifetime of this Gemma. + Gemma(const Path& weights, MatMulEnv& env); // Allocates weights, caller is responsible for filling them. - Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, NestedPools& pools); + Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, MatMulEnv& env); ~Gemma(); const ModelConfig& GetModelConfig() const { return model_.Config(); } @@ -252,12 +252,8 @@ class Gemma { void GenerateImageTokens(const RuntimeConfig& runtime_config, const Image& image, ImageTokens& image_tokens); - void SetMatMulVerbosity(int verbosity) { - if (verbosity >= 2) env_.print_best = true; - } - private: - MatMulEnv env_; + MatMulEnv& env_; GemmaTokenizer tokenizer_; // Type-erased so that this can be defined in the header. diff --git a/gemma/run.cc b/gemma/run.cc index c0b1eb5..669f614 100644 --- a/gemma/run.cc +++ b/gemma/run.cc @@ -26,12 +26,12 @@ #include "evals/benchmark_helper.h" #include "gemma/common.h" #include "gemma/gemma.h" // Gemma +#include "ops/matmul.h" // MatMulEnv #include "paligemma/image.h" #include "util/app.h" #include "util/args.h" // HasHelp #include "util/threading.h" #include "hwy/base.h" -#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/highway.h" #include "hwy/profiler.h" @@ -248,11 +248,11 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) { // Note that num_threads is an upper bound; we also limit to the number of // detected and enabled cores. - NestedPools pools = CreatePools(app); - Allocator::Init(pools.Topology()); - - Gemma model = CreateGemma(loader, pools); - model.SetMatMulVerbosity(app.verbosity); + const BoundedTopology topology = CreateTopology(app); + NestedPools pools = CreatePools(topology, app); + MatMulEnv env(topology, pools); + if (app.verbosity >= 2) env.print_best = true; + Gemma model = CreateGemma(loader, env); KVCache kv_cache = KVCache::Create(model.GetModelConfig(), inference.prefill_tbatch_size); @@ -279,7 +279,7 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) { std::cout << "\033[2J\033[1;1H" // clear screen << kAsciiArtBanner << "\n\n"; - ShowConfig(loader, inference, app, pools); + ShowConfig(loader, inference, app, topology, pools); std::cout << "\n" << instructions << "\n"; } diff --git a/ops/bench_matmul.cc b/ops/bench_matmul.cc index b32f3dd..30ec634 100644 --- a/ops/bench_matmul.cc +++ b/ops/bench_matmul.cc @@ -212,7 +212,7 @@ void BenchAllMatMul() { // Skip EMU128 (10x slower than SSE4 for SFP) and older x86. if (HWY_TARGET == HWY_EMU128 || HWY_TARGET == HWY_SSSE3 || - HWY_TARGET == HWY_SSE2) { + HWY_TARGET == HWY_SSE2 || HWY_TARGET == HWY_NEON_WITHOUT_AES) { return; } @@ -220,13 +220,13 @@ void BenchAllMatMul() { const BoundedSlice package_slice; // all packages/sockets const BoundedSlice cluster_slice; // all clusters/CCX const BoundedSlice lp_slice; // default to all cores (per package). - NestedPools pools(max_threads, Tristate::kDefault, package_slice, - cluster_slice, lp_slice); - fprintf(stderr, "BenchAllMatMul %s %s\n", pools.TopologyString(), + const BoundedTopology topology(package_slice, cluster_slice, lp_slice); + Allocator::Init(topology, /*enable_bind=*/true); + NestedPools pools(topology, max_threads, Tristate::kDefault); + fprintf(stderr, "BenchAllMatMul %s %s\n", topology.TopologyString(), pools.PinString()); - Allocator::Init(pools.Topology(), /*enable_bind=*/true); - MatMulEnv env(pools); + MatMulEnv env(topology, pools); for (size_t batch_size : {1, 4, 128, 512}) { constexpr bool kAdd = false; diff --git a/ops/dot_test.cc b/ops/dot_test.cc index 6533edb..bac78af 100644 --- a/ops/dot_test.cc +++ b/ops/dot_test.cc @@ -1000,8 +1000,9 @@ struct TestShortDotsT { const size_t N = hn::Lanes(d); const hn::ScalableTag df; // for CallDot - NestedPools pools = CreatePools(AppArgs()); - Allocator::Init(pools.Topology()); + const AppArgs app; + BoundedTopology topology(CreateTopology(app)); + NestedPools pools = CreatePools(topology, app); CompressWorkingSet work; std::mt19937 rng; rng.seed(12345); @@ -1109,9 +1110,9 @@ void TestAllDot() { constexpr size_t kReps = hn::AdjustedReps(40); const size_t num = 24 * 1024; - NestedPools pools(kMaxWorkers - 1, /*pin=*/Tristate::kDefault, - BoundedSlice(0, 1), BoundedSlice(0, 1)); - Allocator::Init(pools.Topology()); + const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1), + BoundedSlice()); + NestedPools pools(topology, kMaxWorkers - 1, /*pin=*/Tristate::kDefault); RowVectorBatch a(Extents2D(kMaxWorkers, num)); RowVectorBatch b(Extents2D(kMaxWorkers, num)); RowVectorBatch bufs(Extents2D(kMaxWorkers, num)); diff --git a/ops/matmul.cc b/ops/matmul.cc index 678da56..8e9fc82 100644 --- a/ops/matmul.cc +++ b/ops/matmul.cc @@ -411,7 +411,8 @@ IndexRangePartition MMParallel::RangesOfNP(size_t max_packages, size_t N, NPMultiple(N, sizeof_TC, nr, num_packages)); } -MatMulEnv::MatMulEnv(NestedPools& pools) : parallel(pools), storage(parallel) { +MatMulEnv::MatMulEnv(const BoundedTopology& topology, NestedPools& pools) + : parallel(topology, pools), storage(parallel) { // Ensure Allocator:Init was called. HWY_ASSERT(Allocator::LineBytes() != 0 && Allocator::VectorBytes() != 0); diff --git a/ops/matmul.h b/ops/matmul.h index b7f0ac6..dc375d0 100644 --- a/ops/matmul.h +++ b/ops/matmul.h @@ -28,10 +28,10 @@ #include "util/allocator.h" #include "util/basics.h" #include "util/threading.h" +#include "util/topology.h" #include "hwy/aligned_allocator.h" // Span #include "hwy/base.h" #include "hwy/bit_set.h" -#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/profiler.h" // IWYU pragma: end_exports @@ -51,7 +51,9 @@ class MMParallel { public: static constexpr size_t kMaxPackages = 4; - MMParallel(NestedPools& pools) : pools_(pools) { + // Both references must outlive this object. + MMParallel(const BoundedTopology& topology, NestedPools& pools) + : topology_(topology), pools_(pools) { HWY_DASSERT(pools_.NumPackages() <= kMaxPackages); } @@ -64,7 +66,7 @@ class MMParallel { // For `BindB` and `BindC`. size_t Node(size_t pkg_idx) const { - return pools_.Topology().GetCluster(pkg_idx, 0).Node(); + return topology_.GetCluster(pkg_idx, 0).Node(); } // Calls `func(pkg_idx)` for each package in parallel. @@ -167,6 +169,7 @@ class MMParallel { } private: + const BoundedTopology& topology_; NestedPools& pools_; }; @@ -603,7 +606,7 @@ struct MMPerKey { // Stores state shared across MatMul calls. Non-copyable. struct MatMulEnv { - explicit MatMulEnv(NestedPools& pools); + explicit MatMulEnv(const BoundedTopology& topology, NestedPools& pools); bool have_timer_stop = false; diff --git a/ops/matmul_test.cc b/ops/matmul_test.cc index 3d34715..aaf3bc1 100644 --- a/ops/matmul_test.cc +++ b/ops/matmul_test.cc @@ -312,22 +312,22 @@ void TestTiny() { if (HWY_TARGET != first_target) return; for (size_t max_packages : {1, 2}) { + const BoundedTopology topology(BoundedSlice(0, max_packages)); + Allocator::Init(topology, /*enable_bind=*/true); const size_t max_threads = 0; // no limit - NestedPools pools(max_threads, Tristate::kDefault, - BoundedSlice(0, max_packages)); + NestedPools pools(topology, max_threads, Tristate::kDefault); #if GEMMA_DISABLE_TOPOLOGY if (max_packages == 2) break; // we only have one package #else // If less than the limit, we have already tested all num_packages. - if (pools.Topology().FullTopology().packages.size() < max_packages) break; + if (topology.FullTopology().packages.size() < max_packages) break; #endif fprintf(stderr, "TestTiny %zu: %s %s\n", max_packages, - pools.TopologyString(), pools.PinString()); + topology.TopologyString(), pools.PinString()); Tristate use_spinning = Tristate::kDefault; pools.MaybeStartSpinning(use_spinning); - Allocator::Init(pools.Topology(), /*enable_bind=*/true); - MatMulEnv env(pools); + MatMulEnv env(topology, pools); for (size_t M = 1; M <= 12; ++M) { for (size_t K = 1; K <= 64; K *= 2) { @@ -347,11 +347,12 @@ void TestAllMatMul() { return; } - NestedPools pools(0); // no limits + const BoundedTopology topology; + Allocator::Init(topology, /*enable_bind=*/true); + NestedPools pools(topology); Tristate use_spinning = Tristate::kDefault; pools.MaybeStartSpinning(use_spinning); - Allocator::Init(pools.Topology(), /*enable_bind=*/true); - MatMulEnv env(pools); + MatMulEnv env(topology, pools); // Sizes seen in gemma_test 2B. Too slow for CI, enable on-demand. TestMatMul(1, 2048, 512, /*add=*/false, env, __LINE__); diff --git a/ops/ops_test.cc b/ops/ops_test.cc index bb5b380..5414138 100644 --- a/ops/ops_test.cc +++ b/ops/ops_test.cc @@ -37,6 +37,7 @@ #include "util/allocator.h" #include "util/app.h" #include "util/test_util.h" +#include "util/threading.h" #include "hwy/base.h" #include "hwy/tests/hwy_gtest.h" @@ -387,8 +388,9 @@ static HWY_NOINLINE HWY_MAYBE_UNUSED void ScalarRopeAndMulBy( } void TestRopeAndMulBy() { - NestedPools pools = CreatePools(AppArgs()); - Allocator::Init(pools.Topology()); + AppArgs app; + BoundedTopology topology = CreateTopology(app); + NestedPools pools = CreatePools(topology, app); ModelConfig config = ConfigFromModel(Model::GEMMA2_9B); int dim_qkv = config.layer_configs[0].qkv_dim; diff --git a/util/allocator.cc b/util/allocator.cc index c3db82f..4beedca 100644 --- a/util/allocator.cc +++ b/util/allocator.cc @@ -17,12 +17,11 @@ #include -#include -#include - #include "util/basics.h" // MaybeCheckInitialized #include "hwy/aligned_allocator.h" #include "hwy/base.h" +#include "hwy/contrib/thread_pool/futex.h" +#include "hwy/contrib/thread_pool/topology.h" #include "hwy/per_target.h" // VectorBytes // To avoid a dependency on libnuma, use syscalls directly. We require six @@ -52,6 +51,7 @@ #include #include +#include #endif // GEMMA_BIND && HWY_OS_LINUX namespace gcpp { @@ -166,7 +166,7 @@ Allocator::PtrAndDeleter Allocator::AllocBytes(size_t bytes) { auto call_free = [](void* ptr, size_t /*bytes*/) { hwy::FreeAlignedBytes(ptr, nullptr, nullptr); }; - return PtrAndDeleter{p.release(), Deleter(call_free, bytes)}; + return PtrAndDeleter{p.release(), DeleterFree(call_free, bytes)}; } // Binding, or large vector/cache line size: use platform-specific allocator. @@ -186,14 +186,14 @@ Allocator::PtrAndDeleter Allocator::AllocBytes(size_t bytes) { const int ret = munmap(ptr, bytes); HWY_ASSERT(ret == 0); }; - return PtrAndDeleter{p, Deleter(call_munmap, bytes)}; + return PtrAndDeleter{p, DeleterFree(call_munmap, bytes)}; #elif HWY_OS_WIN const auto call_free = [](void* ptr, size_t) { _aligned_free(ptr); }; const size_t alignment = HWY_MAX(vector_bytes_, line_bytes_); return PtrAndDeleter{_aligned_malloc(bytes, alignment), - Deleter(call_free, bytes)}; + DeleterFree(call_free, bytes)}; #else - return PtrAndDeleter{nullptr, Deleter(nullptr, 0)}; + return PtrAndDeleter{nullptr, DeleterFree(nullptr, 0)}; #endif } @@ -288,7 +288,7 @@ bool Allocator::BindMemory(void* ptr, size_t bytes, size_t node) { CountBusyPages(num_pages, node, pages.data(), status.data()); if (HWY_UNLIKELY(num_busy != 0)) { // Trying again is usually enough to succeed. - usleep(5); // NOLINT(runtime/sleep) + hwy::NanoSleep(5000); (void)SyscallWrappers::move_pages( /*pid=*/0, num_pages, pages.data(), nodes.data(), status.data(), flags); const size_t still_busy = diff --git a/util/allocator.h b/util/allocator.h index 55c26af..e54fdc7 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -22,12 +22,11 @@ #include // IWYU pragma: begin_exports -#include +#include // std::unique_ptr #include "util/basics.h" -#include "util/threading.h" +#include "util/topology.h" #include "hwy/base.h" -#include "hwy/contrib/thread_pool/thread_pool.h" // IWYU pragma: end_exports #include "hwy/aligned_allocator.h" @@ -38,12 +37,12 @@ namespace gcpp { // `bytes` argument is required for the latter. using FreeFunc = void (*)(void* mem, size_t bytes); -// Custom deleter for std::unique_ptr that calls `FreeFunc`. -class Deleter { +// Custom deleter for std::unique_ptr that calls `FreeFunc`. T is POD. +class DeleterFree { public: // `MatStorageT` requires this to be default-constructible. - Deleter() : free_func_(nullptr), bytes_(0) {} - Deleter(FreeFunc free_func, size_t bytes) + DeleterFree() : free_func_(nullptr), bytes_(0) {} + DeleterFree(FreeFunc free_func, size_t bytes) : free_func_(free_func), bytes_(bytes) {} template @@ -56,9 +55,31 @@ class Deleter { size_t bytes_; }; +// Wrapper that also calls the destructor for non-POD T. +class DeleterDtor { + public: + DeleterDtor() {} + DeleterDtor(size_t num, DeleterFree free) : num_(num), free_(free) {} + + template + void operator()(T* p) const { + for (size_t i = 0; i < num_; ++i) { + p[i].~T(); + } + free_(p); + } + + private: + size_t num_; // not the same as free_.bytes_ / sizeof(T)! + DeleterFree free_; +}; + // Unique (move-only) pointer to an aligned array of POD T. template -using AlignedPtr = std::unique_ptr; +using AlignedPtr = std::unique_ptr; +// Unique (move-only) pointer to an aligned array of non-POD T. +template +using AlignedClassPtr = std::unique_ptr; // Both allocation, binding, and row accessors depend on the sizes of memory // pages and cache lines. To avoid having to pass `Allocator&` everywhere, we @@ -105,6 +126,26 @@ class Allocator { return AlignedPtr(static_cast(pd.p), pd.deleter); } + // Same as Alloc, but calls constructor(s) with `args`. + template + static AlignedClassPtr AllocClasses(size_t num, Args&&... args) { + constexpr size_t kSize = sizeof(T); + constexpr bool kIsPow2 = (kSize & (kSize - 1)) == 0; + constexpr size_t kBits = hwy::detail::ShiftCount(kSize); + static_assert(!kIsPow2 || (1ull << kBits) == kSize, "ShiftCount has a bug"); + const size_t bytes = kIsPow2 ? num << kBits : num * kSize; + // Fail if the `bytes = num * kSize` computation overflowed. + const size_t check = kIsPow2 ? bytes >> kBits : bytes / kSize; + if (check != num) return AlignedClassPtr(); + + PtrAndDeleter pd = AllocBytes(bytes); + T* p = static_cast(pd.p); + for (size_t i = 0; i < num; ++i) { + new (p + i) T(std::forward(args)...); + } + return AlignedClassPtr(p, DeleterDtor(num, pd.deleter)); + } + // Returns whether `BindMemory` can/should be called, i.e. we have page-level // control over memory placement and multiple packages and NUMA nodes. static bool ShouldBind(); @@ -119,7 +160,7 @@ class Allocator { // Type-erased so this can be implemented in allocator.cc. struct PtrAndDeleter { void* p; - Deleter deleter; + DeleterFree deleter; }; static PtrAndDeleter AllocBytes(size_t bytes); }; diff --git a/util/app.h b/util/app.h index 5c0698f..35c8c90 100644 --- a/util/app.h +++ b/util/app.h @@ -117,12 +117,15 @@ class AppArgs : public ArgsBase { } }; -// Callers must call Allocator::Init(pools.Topology()) after this. -static inline NestedPools CreatePools(const AppArgs& app) { - return NestedPools(app.max_threads, app.pin, - BoundedSlice(app.skip_packages, app.max_packages), - BoundedSlice(app.skip_clusters, app.max_clusters), - BoundedSlice(app.skip_lps, app.max_lps)); +static inline BoundedTopology CreateTopology(const AppArgs& app) { + return BoundedTopology(BoundedSlice(app.skip_packages, app.max_packages), + BoundedSlice(app.skip_clusters, app.max_clusters), + BoundedSlice(app.skip_lps, app.max_lps)); +} +static inline NestedPools CreatePools(const BoundedTopology& topology, + const AppArgs& app) { + Allocator::Init(topology); + return NestedPools(topology, app.max_threads, app.pin); } struct LoaderArgs : public ArgsBase { @@ -224,24 +227,26 @@ struct LoaderArgs : public ArgsBase { ModelInfo info_; }; -static inline Gemma CreateGemma(const LoaderArgs& loader, NestedPools& pools) { +// `env` must remain valid for the lifetime of the Gemma. +static inline Gemma CreateGemma(const LoaderArgs& loader, MatMulEnv& env) { if (Type::kUnknown == loader.Info().weight || Model::UNKNOWN == loader.Info().model || loader.tokenizer.path.empty()) { // New weights file format doesn't need tokenizer path or model/weightinfo. - return Gemma(loader.weights, pools); + return Gemma(loader.weights, env); } - return Gemma(loader.tokenizer, loader.weights, loader.Info(), pools); + return Gemma(loader.tokenizer, loader.weights, loader.Info(), env); } +// `env` must remain valid for the lifetime of the Gemma. static inline std::unique_ptr AllocateGemma(const LoaderArgs& loader, - NestedPools& pools) { + MatMulEnv& env) { if (Type::kUnknown == loader.Info().weight || Model::UNKNOWN == loader.Info().model || loader.tokenizer.path.empty()) { // New weights file format doesn't need tokenizer path or model/weight info. - return std::make_unique(loader.weights, pools); + return std::make_unique(loader.weights, env); } return std::make_unique(loader.tokenizer, loader.weights, - loader.Info(), pools); + loader.Info(), env); } struct InferenceArgs : public ArgsBase { diff --git a/util/threading.cc b/util/threading.cc index 26a45fd..c2f8bb7 100644 --- a/util/threading.cc +++ b/util/threading.cc @@ -19,8 +19,7 @@ #include // std::sort #include -#include // std::make_unique -#include // std::move +#include #include // Placeholder for container detection, do not remove @@ -45,50 +44,6 @@ class Pinning { return false; } public: - // Returns set of LPs available for use. Cached during the first call - // because subsequent pinning overwrites the main thread's affinity. - // Thread-hostile, not called concurrently. - LPS EnabledLPs(const BoundedSlice& lp_slice) { - if (enabled_lps_.Any()) return enabled_lps_; - - LPS affinity; - if (HWY_LIKELY(GetThreadAffinity(affinity))) { - // To honor taskset/numactl *and* the users's `lp_slice`, we interpret - // the latter as a slice of the 1-bits of `enabled_lps`. Note that this - // can be used to exclude hyperthreads because Linux groups LPs by - // sibling index. For example, the first `num_cores` are not siblings. - const size_t detected = affinity.Count(); - size_t enabled_idx = 0; - affinity.Foreach([&](size_t lp) { - if (lp_slice.Contains(detected, enabled_idx)) { - enabled_lps_.Set(lp); - } - ++enabled_idx; - }); - } else { - const size_t num_lps = hwy::TotalLogicalProcessors(); - HWY_WARN("unknown OS affinity, max %zu LPs and slice %zu.", num_lps, - lp_slice.Num(num_lps)); - for (size_t lp = 0; lp < num_lps; ++lp) { - if (lp_slice.Contains(num_lps, lp)) { - enabled_lps_.Set(lp); - } - } - } - - // Without threading support, only keep the first enabled LP; it might still - // make sense to pin the main thread to avoid migrations. - if (HWY_UNLIKELY(!hwy::HaveThreadingSupport())) { - HWY_ASSERT(enabled_lps_.Any()); - const size_t lp = enabled_lps_.First(); - enabled_lps_ = LPS(); - enabled_lps_.Set(lp); - HWY_WARN("Warning, threads not supported, using only the main thread."); - } - - return enabled_lps_; - } - void SetPolicy(Tristate pin) { if (pin == Tristate::kDefault) { // Pinning is unreliable inside containers because the hypervisor might @@ -103,23 +58,25 @@ class Pinning { // If want_pin_, tries to pin each worker in `pool` to an LP in `cluster`, // and sets `any_error_` if any fails. void MaybePin(size_t pkg_idx, size_t cluster_idx, - const BoundedTopology::Cluster& cluster, PoolPtr& pool) { + const BoundedTopology::Cluster& cluster, + hwy::ThreadPool& pool) { const std::vector lps = cluster.LPVector(); - HWY_ASSERT(pool->NumWorkers() <= lps.size()); - pool->Run(0, pool->NumWorkers(), [&](uint64_t task, size_t thread) { + HWY_ASSERT(pool.NumWorkers() <= lps.size()); + pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { HWY_ASSERT(task == thread); // each worker has one task char buf[16]; // Linux limitation - const int bytes_written = snprintf(buf, sizeof(buf), "P%zu X%02zu C%03zu", - pkg_idx, cluster_idx, task); + const int bytes_written = + snprintf(buf, sizeof(buf), "P%zu X%02zu C%03d", pkg_idx, cluster_idx, + static_cast(task)); HWY_ASSERT(bytes_written < sizeof(buf)); hwy::SetThreadName(buf, 0); // does not support varargs if (HWY_LIKELY(want_pin_)) { if (HWY_UNLIKELY(!hwy::PinThreadToLogicalProcessor(lps[task]))) { - fprintf(stderr, - "Pinning failed for task %zu of %zu to %zu (size %zu)\n", - task, pool->NumWorkers(), lps[task], lps.size()); + fprintf( + stderr, "Pinning failed for task %d of %zu to %zu (size %zu)\n", + static_cast(task), pool.NumWorkers(), lps[task], lps.size()); (void)any_error_.test_and_set(); } } @@ -141,7 +98,6 @@ class Pinning { private: std::atomic_flag any_error_ = ATOMIC_FLAG_INIT; bool want_pin_; // set in SetPolicy - LPS enabled_lps_; }; // Pinning // Singleton saves global affinity across all BoundedTopology instances because @@ -151,263 +107,18 @@ static Pinning& GetPinning() { return pinning; } -BoundedTopology::BoundedTopology(BoundedSlice package_slice, - BoundedSlice cluster_slice, - BoundedSlice lp_slice) { - const LPS enabled_lps = GetPinning().EnabledLPs(lp_slice); - -#if !GEMMA_DISABLE_TOPOLOGY - if (HWY_LIKELY(!topology_.packages.empty())) { - InitFromTopology(enabled_lps, package_slice, cluster_slice); - } -#endif - - // Topology unknown or no packages with enabled LPs: create a single - // package with one cluster, and one node. - if (HWY_UNLIKELY(NumPackages() == 0)) { - InitFromLPs(enabled_lps); - } - - HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0); -} - -// Topology is unknown, take the given set of LPs. -BoundedTopology::Cluster::Cluster(const LPS& lps) { - lps_ = lps; - num_workers_ = lps.Count(); -} - -BoundedTopology::Cluster::Cluster(const LPS& enabled_lps, - const std::vector& all_lps, - const hwy::Topology::Cluster& tcluster) { - bool is_first_lp = true; - - tcluster.lps.Foreach([&](size_t lp) { - // Skip if not first-hyperthread or disabled. - if (all_lps[lp].smt != 0 || !enabled_lps.Get(lp)) return; - - HWY_ASSERT(!lps_.Get(lp)); // Foreach ensures uniqueness - lps_.Set(lp); - ++num_workers_; - - // Set fields once, and ensure subsequent LPs match - we assume there - // is only one NUMA node per cluster, with the same L2/L3 size. - const size_t lp_node = static_cast(all_lps[lp].node); - if (is_first_lp) { - is_first_lp = false; - node_ = lp_node; - private_kib_ = tcluster.private_kib; - shared_kib_ = tcluster.shared_kib; - } else { - static bool warned = false; - if (HWY_LIKELY(!warned)) { - if (HWY_UNLIKELY(lp_node != node_)) { - warned = true; - HWY_WARN("lp %zu on node %zu != cluster node %zu.", lp, lp_node, - node_); - } - if (HWY_UNLIKELY(private_kib_ != tcluster.private_kib)) { - warned = true; - HWY_WARN("lp %zu private_kib %zu != cluster %zu.", lp, private_kib_, - tcluster.private_kib); - } - if (HWY_UNLIKELY(shared_kib_ != tcluster.shared_kib)) { - warned = true; - HWY_WARN("lp %zu shared_kib %zu != cluster %zu.", lp, shared_kib_, - tcluster.shared_kib); - } - } // !warned - } - }); -} - -// CPUs without clusters are rarely more than dozens of cores, and 6 is a -// decent number of threads in a per-cluster pool. -constexpr bool kSplitLargeClusters = false; -constexpr size_t kMaxClusters = 8; -constexpr size_t kMaxLPsPerCluster = 6; - -// Topology is unknown, use only the given LPs which derive from OS affinity -// and `lp_slice`. -BoundedTopology::Package::Package(const LPS& enabled_lps) { - LPS clusters_lps[kMaxClusters]; - const size_t num_clusters = - kSplitLargeClusters - ? HWY_MIN(kMaxClusters, - hwy::DivCeil(enabled_lps.Count(), kMaxLPsPerCluster)) - : 1; - - size_t enabled_idx = 0; - enabled_lps.Foreach([&](size_t lp) { - clusters_lps[enabled_idx % num_clusters].Set(lp); - ++enabled_idx; - }); - - for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { - clusters.push_back(Cluster(clusters_lps[cluster_idx])); - } -} - -// NOTE: caller is responsible for checking whether `clusters` is empty. -BoundedTopology::Package::Package(const LPS& enabled_lps, - const hwy::Topology& topology, size_t pkg_idx, - BoundedSlice cluster_slice) { - const hwy::Topology::Package& tpackage = topology.packages[pkg_idx]; - // Populate `clusters` with the subset of clusters in `cluster_slice` that - // have any enabled LPs. If `clusters` remains empty, the caller will - // skip this `Package`. - clusters.reserve(cluster_slice.Num(tpackage.clusters.size())); - cluster_slice.Foreach( - "cluster", tpackage.clusters.size(), [&](size_t cluster_idx) { - const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx]; - Cluster cluster(enabled_lps, topology.lps, tcluster); - - // Skip if empty, i.e. too few `enabled_lps`. - if (HWY_LIKELY(cluster.Size() != 0)) { - clusters.push_back(cluster); - } - }); - SortByDescendingSize(clusters); - - // If there is only one large cluster, split it into smaller ones. - if (kSplitLargeClusters && clusters.size() == 1 && - enabled_lps.Count() >= 16) { - const LPS lps = clusters[0].LPSet(); // copy so we can clear - clusters.clear(); - - // Split `lps` into several clusters. - LPS clusters_lps[kMaxClusters]; - const size_t num_clusters = - HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster)); - size_t num_lps = 0; - lps.Foreach( - [&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); }); - HWY_DASSERT(num_lps == lps.Count()); - - // Create new clusters, just inserting the new LPS. - hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy - for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { - tcluster.lps = clusters_lps[cluster_idx]; - // Keep same `private_kib` and `shared_kib`. - clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster)); - } - } -} - -#if !GEMMA_DISABLE_TOPOLOGY - -static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) { - LPS cores; - lps.Foreach([&](size_t lp) { - if (topology.lps[lp].smt == 0) cores.Set(lp); - }); - return cores.Count(); -} - -// Scans hwy::Topology for clusters and their size, for use by topology_string_. -static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters, - size_t& max_tcluster_cores, - size_t& max_tcluster_lps) { - max_tclusters = 0; - max_tcluster_cores = 0; - max_tcluster_lps = 0; - for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) { - const std::vector& tclusters = - topology_.packages[pkg_idx].clusters; - max_tclusters = HWY_MAX(max_tclusters, tclusters.size()); - size_t tcluster_cores = 0; - size_t tcluster_lps = 0; - for (size_t cluster_idx = 0; cluster_idx < tclusters.size(); - ++cluster_idx) { - const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_); - const size_t lps = tclusters[cluster_idx].lps.Count(); - tcluster_cores = HWY_MAX(tcluster_cores, cores); - tcluster_lps = HWY_MAX(tcluster_lps, lps); - } - - if (tclusters.size() > 1 && tcluster_cores > 8) { - HWY_WARN( - "Package %zu: multiple clusters with max size %zu, whereas CCX " - "only have 8, may indicate a bug in hwy::Topology.", - pkg_idx, tcluster_cores); - } - max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores); - max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps); - } - HWY_ASSERT(max_tclusters != 0); - HWY_ASSERT(max_tcluster_cores != 0); - HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores); -} - -// Main part of ctor, called when topology is known. -void BoundedTopology::InitFromTopology(const LPS& enabled_lps, - BoundedSlice package_slice, - BoundedSlice cluster_slice) { - size_t max_tclusters, max_tcluster_cores, max_tcluster_lps; - ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps); - - // (Possibly empty) subset of `Topology` packages that have `enabled_lps`. - package_slice.Foreach( - "package", topology_.packages.size(), [&](size_t pkg_idx) { - Package package(enabled_lps, topology_, pkg_idx, cluster_slice); - // Skip if empty, i.e. too few `enabled_lps`. - if (HWY_LIKELY(!package.clusters.empty())) { - packages_.push_back(std::move(package)); - } - }); - if (NumPackages() == 0) return; - SortByDescendingSize(packages_); - - // Remember NUMA nodes that we are actually using (not just enabled). - for (const Package& p : packages_) { - for (const Cluster& c : p.clusters) { - nodes_.Set(c.Node()); - } - } - - // Scan for max BoundedTopology clusters and their size, for topology_string_. - size_t all_max_cluster_size = 0; - for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) { - size_t max_cluster_size = 0; - for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx); - ++cluster_idx) { - max_cluster_size = - HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size()); - } - if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) { - HWY_WARN( - "Package %zu: multiple clusters with max size %zu, whereas CCX " - "only have 8, may indicate a bug in BoundedTopology.", - pkg_idx, max_cluster_size); - } - all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size); - } - - snprintf(topology_string_, sizeof(topology_string_), - "%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)", - topology_.packages.size(), max_tclusters, max_tcluster_cores, - max_tcluster_lps / max_tcluster_cores, packages_.size(), - NumClusters(0), all_max_cluster_size, nodes_.Count()); -} - -#endif // !GEMMA_DISABLE_TOPOLOGY - -void BoundedTopology::InitFromLPs(const LPS& enabled_lps) { - packages_.push_back(Package(enabled_lps)); - - snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu", - GetCluster(0, 0).Size()); - - // Assume a single NUMA node. - nodes_.Set(0); - HWY_ASSERT(NumNodes() == 1); -} - -static PoolPtr MakePool(size_t num_workers) { +static PoolPtr MakePool(size_t num_workers, + std::optional node = std::nullopt) { // `ThreadPool` expects the number of threads to create, which is one less // than the number of workers, but avoid underflow if zero. const size_t num_threads = num_workers == 0 ? 0 : num_workers - 1; - return std::make_unique(num_threads); + PoolPtr ptr = Allocator::AllocClasses(1, num_threads); + const size_t bytes = + hwy::RoundUpTo(sizeof(hwy::ThreadPool), Allocator::QuantumBytes()); + if (node.has_value() && Allocator::ShouldBind()) { + Allocator::BindMemory(ptr.get(), bytes, node.value()); + } + return ptr; } // Used to divide max_threads and max_workers_per_package across packages and @@ -422,23 +133,21 @@ static size_t DivideMaxAcross(const size_t max, const size_t instances) { return max; } -NestedPools::NestedPools(size_t max_threads, Tristate pin, - BoundedSlice package_slice, BoundedSlice cluster_slice, - BoundedSlice lp_slice) - : topology_(package_slice, cluster_slice, lp_slice) { +NestedPools::NestedPools(const BoundedTopology& topology, size_t max_threads, + Tristate pin) { GetPinning().SetPolicy(pin); - packages_.resize(topology_.NumPackages()); + packages_.resize(topology.NumPackages()); all_packages_ = MakePool(packages_.size()); const size_t max_workers_per_package = DivideMaxAcross(max_threads, packages_.size()); // Each worker in all_packages_, including the main thread, will be the - // calling thread of an all_clusters->Run, and hence pinned to one of the + // calling thread of an all_clusters[0].Run, and hence pinned to one of the // `cluster.lps` if `pin`. - all_packages_->Run( - 0, all_packages_->NumWorkers(), [&](uint64_t pkg_idx, size_t thread) { + all_packages_[0].Run( + 0, packages_.size(), [&](uint64_t pkg_idx, size_t thread) { HWY_ASSERT(pkg_idx == thread); // each thread has one task packages_[pkg_idx] = - Package(topology_, pkg_idx, max_workers_per_package); + Package(topology, pkg_idx, max_workers_per_package); }); all_pinned_ = GetPinning().AllPinned(&pin_string_); @@ -458,6 +167,11 @@ NestedPools::NestedPools(size_t max_threads, Tristate pin, HWY_ASSERT(max_workers_per_cluster_ <= 256); } +// `max_or_zero` == 0 means no limit. +static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) { + return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero); +} + NestedPools::Package::Package(const BoundedTopology& topology, size_t pkg_idx, size_t max_workers_per_package) { // Pre-allocate because elements are set concurrently. @@ -465,19 +179,21 @@ NestedPools::Package::Package(const BoundedTopology& topology, size_t pkg_idx, const size_t max_workers_per_cluster = DivideMaxAcross(max_workers_per_package, clusters_.size()); - all_clusters_ = MakePool(clusters_.size()); + all_clusters_ = + MakePool(clusters_.size(), topology.GetCluster(pkg_idx, 0).Node()); // Parallel so we also pin the calling worker in `all_clusters` to // `cluster.lps`. - all_clusters_->Run( - 0, all_clusters_->NumWorkers(), [&](size_t cluster_idx, size_t thread) { + all_clusters_[0].Run( + 0, all_clusters_[0].NumWorkers(), [&](size_t cluster_idx, size_t thread) { HWY_ASSERT(cluster_idx == thread); // each thread has one task const BoundedTopology::Cluster& cluster = topology.GetCluster(pkg_idx, cluster_idx); clusters_[cluster_idx] = - MakePool(CapIfNonZero(cluster.Size(), max_workers_per_cluster)); + MakePool(CapIfNonZero(cluster.Size(), max_workers_per_cluster), + cluster.Node()); // Pin workers AND the calling thread from `all_clusters`. GetPinning().MaybePin(pkg_idx, cluster_idx, cluster, - clusters_[cluster_idx]); + clusters_[cluster_idx][0]); }); } diff --git a/util/threading.h b/util/threading.h index 14676b4..d7de410 100644 --- a/util/threading.h +++ b/util/threading.h @@ -19,14 +19,14 @@ #include #include -#include // std::unique_ptr #include // IWYU pragma: begin_exports +#include "util/allocator.h" #include "util/basics.h" // Tristate -#include "hwy/base.h" // HWY_ASSERT +#include "util/topology.h" +#include "hwy/base.h" // HWY_ASSERT #include "hwy/contrib/thread_pool/thread_pool.h" -#include "hwy/contrib/thread_pool/topology.h" // IWYU pragma: end_exports #ifndef GEMMA_DISABLE_TOPOLOGY @@ -35,157 +35,9 @@ namespace gcpp { -static inline size_t SaturatingSub(size_t a, size_t b) { - return a - HWY_MIN(a, b); -} - -// `max_or_zero` == 0 means no limit. -static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) { - return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero); -} - -// A slice of a 1D integer range such as the indices of packages or clusters. -// This allows assigning them to multiple instances of our binary. -class BoundedSlice { - public: - // Defaults to "use all detected". - BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {} - - size_t Begin() const { return skip_; } - - // STL-style one past the end. - size_t End(size_t detected) const { - return (max_ == 0) ? detected : HWY_MIN(detected, skip_ + max_); - } - - // Number of elements in the slice. - size_t Num(size_t detected) const { return End(detected) - Begin(); } - - bool Contains(size_t detected, size_t idx) const { - return Begin() <= idx && idx < End(detected); - } - - template - void Foreach(const char* name, size_t detected, const Func& func) { - if (Begin() >= detected) { - HWY_ABORT("Invalid skip=%zu for %s, detected=%zu", skip_, name, detected); - } - for (size_t i = Begin(); i < End(detected); ++i) { - func(i); - } - } - - private: - // How many to skip, or equivalently, index of the first to use. It is an - // error if this is >= `detected`, because that would leave none for this - // instance to use. - size_t skip_; - - // Upper bound on the number to use, or zero if no limit. - size_t max_; -}; - -// "LP" is a logical processor, a 0-based index passed to the OS. -using LPS = hwy::LogicalProcessorSet; - -// We want vectors of hwy::ThreadPool, which is unfortunately not movable, -// hence we wrap them in unique_ptr. -using PoolPtr = std::unique_ptr; - -// Wraps hwy::Topology and only keeps the subset of packages and clusters -// apportioned by BoundedSlice, further limited by the OS affinity mask. -// NOTE: if topology is unknown or the OS affinity is too restrictive, we fall -// back to a single package and cluster. -class BoundedTopology { - public: - // Thread-hostile, typically called from main thread. - BoundedTopology(BoundedSlice package_slice, BoundedSlice cluster_slice, - BoundedSlice lp_slice); - - size_t NumPackages() const { return packages_.size(); } - size_t NumNodes() const { return nodes_.Count(); } - const char* TopologyString() const { return topology_string_; } - - class Cluster { - public: - Cluster(const LPS& lps); - Cluster(const LPS& enabled_lps, - const std::vector& all_lps, - const hwy::Topology::Cluster& tcluster); - - // For SortByDescendingSize. - size_t Size() const { return num_workers_; } - - // Returns vector with all enabled LPs, used for pinning. - std::vector LPVector() const { - std::vector lps; - lps.reserve(lps_.Count()); - lps_.Foreach([&lps](size_t lp) { lps.push_back(lp); }); - return lps; - } - - const LPS& LPSet() const { return lps_; } - size_t Node() const { return node_; } - size_t PrivateKiB() const { return private_kib_; } - size_t SharedKiB() const { return shared_kib_; } - - private: - // Enabled LPs; if topology is known, only the ones in this cluster. - LPS lps_; - // How many workers in the per-cluster pool. If 0, this Cluster is removed. - size_t num_workers_ = 0; - // NUMA node, set from hwy::Topology::LP::node. - size_t node_ = 0; - // L2 cache size in KiB, or 0 if unknown. - size_t private_kib_ = 0; - // L3 cache size in KiB, or 0 if unknown. - size_t shared_kib_ = 0; - }; // Cluster - - size_t NumClusters(size_t pkg_idx) const { - HWY_ASSERT(pkg_idx < NumPackages()); - return packages_[pkg_idx].clusters.size(); - } - const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const { - HWY_ASSERT(pkg_idx < NumPackages()); - const Package& package = packages_[pkg_idx]; - HWY_ASSERT(cluster_idx < package.clusters.size()); - return package.clusters[cluster_idx]; - } - Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) { - HWY_ASSERT(pkg_idx < NumPackages()); - Package& package = packages_[pkg_idx]; - HWY_ASSERT(cluster_idx < package.clusters.size()); - return package.clusters[cluster_idx]; - } - -#if !GEMMA_DISABLE_TOPOLOGY - const hwy::Topology& FullTopology() const { return topology_; } -#endif - - private: - struct Package { - explicit Package(const LPS& enabled_lps); - Package(const LPS& enabled_lps, const hwy::Topology& topology, - size_t pkg_idx, BoundedSlice cluster_slice); - - // For SortByDescendingSize. - size_t Size() const { return clusters.size(); } - - std::vector clusters; - }; // Package - - void InitFromTopology(const LPS& enabled_lps, BoundedSlice package_slice, - BoundedSlice cluster_slice); - void InitFromLPs(const LPS& enabled_lps); - -#if !GEMMA_DISABLE_TOPOLOGY - hwy::Topology topology_; -#endif - std::vector packages_; - char topology_string_[96]; - LPS nodes_; -}; +// Page-aligned on NUMA systems so we can bind to a NUMA node. This also allows +// moving because it is a typedef to `std::unique_ptr`. +using PoolPtr = AlignedClassPtr; // Creates a hierarchy of thread pools according to `BoundedTopology`: one with // a thread per enabled package; for each of those, one with a thread per @@ -215,16 +67,16 @@ class NestedPools { // `max_threads` is the maximum number of threads to divide among all // clusters. This is more intuitive than a per-cluster limit for users who - // may not be aware of the CPU topology. + // may not be aware of the CPU topology. 0 means no limit. // // To ensure we do not create more threads than there are HW cores, which // would cause huge slowdowns when spinning, the `BoundedSlice` arguments // only impose upper bounds on the number of detected packages and clusters // rather than defining the actual number of threads. - NestedPools(size_t max_threads, Tristate pin = Tristate::kDefault, - BoundedSlice package_slice = BoundedSlice(), - BoundedSlice cluster_slice = BoundedSlice(), - BoundedSlice lp_slice = BoundedSlice()); + // + // Caller must have called `Allocator::Init` before this. + NestedPools(const BoundedTopology& topology, size_t max_threads = 0, + Tristate pin = Tristate::kDefault); bool AllPinned() const { return all_pinned_; } @@ -251,7 +103,7 @@ class NestedPools { } size_t NumPackages() const { return packages_.size(); } - hwy::ThreadPool& AllPackages() { return *all_packages_; } + hwy::ThreadPool& AllPackages() { return all_packages_[0]; } hwy::ThreadPool& AllClusters(size_t pkg_idx) { HWY_DASSERT(pkg_idx < NumPackages()); return packages_[pkg_idx].AllClusters(); @@ -261,11 +113,6 @@ class NestedPools { return packages_[pkg_idx].Cluster(cluster_idx); } - // For binding to NUMA nodes. - size_t Node(size_t pkg_idx, size_t cluster_idx) const { - return topology_.GetCluster(pkg_idx, cluster_idx).Node(); - } - // Reasonably tight upper bounds for allocating thread-local storage (TLS). size_t MaxWorkersPerCluster() const { return max_workers_per_cluster_; } size_t MaxWorkersPerPackage() const { @@ -282,10 +129,7 @@ class NestedPools { return total_workers; } - // For Allocator - const BoundedTopology& Topology() const { return topology_; } // For ShowConfig - const char* TopologyString() const { return topology_.TopologyString(); } const char* PinString() const { return pin_string_; } // Returns a single pool on the given package: either one thread per cluster @@ -313,28 +157,28 @@ class NestedPools { size_t max_workers_per_cluster = 0; for (const PoolPtr& cluster : clusters_) { max_workers_per_cluster = - HWY_MAX(max_workers_per_cluster, cluster->NumWorkers()); + HWY_MAX(max_workers_per_cluster, cluster[0].NumWorkers()); } return max_workers_per_cluster; } size_t TotalWorkers() const { size_t total_workers = 0; for (const PoolPtr& cluster : clusters_) { - total_workers += cluster->NumWorkers(); + total_workers += cluster[0].NumWorkers(); } return total_workers; } - hwy::ThreadPool& AllClusters() { return *all_clusters_; } + hwy::ThreadPool& AllClusters() { return all_clusters_[0]; } hwy::ThreadPool& Cluster(size_t cluster_idx) { HWY_DASSERT(cluster_idx < clusters_.size()); - return *clusters_[cluster_idx]; + return clusters_[cluster_idx][0]; } void SetWaitMode(hwy::PoolWaitMode wait_mode) { - all_clusters_->SetWaitMode(wait_mode); + all_clusters_[0].SetWaitMode(wait_mode); for (PoolPtr& cluster : clusters_) { - cluster->SetWaitMode(wait_mode); + cluster[0].SetWaitMode(wait_mode); } } @@ -344,13 +188,12 @@ class NestedPools { }; // Package void SetWaitMode(hwy::PoolWaitMode wait_mode) { - all_packages_->SetWaitMode(wait_mode); + all_packages_[0].SetWaitMode(wait_mode); for (Package& package : packages_) { package.SetWaitMode(wait_mode); } } - BoundedTopology topology_; bool all_pinned_; const char* pin_string_; diff --git a/util/threading_test.cc b/util/threading_test.cc index 4ca69ff..e7fe021 100644 --- a/util/threading_test.cc +++ b/util/threading_test.cc @@ -22,10 +22,17 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "util/allocator.h" +#include "util/basics.h" #include "hwy/aligned_allocator.h" +#include "hwy/auto_tune.h" #include "hwy/base.h" // HWY_ASSERT #include "hwy/contrib/thread_pool/thread_pool.h" -#include "hwy/nanobenchmark.h" +#include "hwy/nanobenchmark.h" // Unpredictable1 +#include "hwy/robust_statistics.h" +#include "hwy/stats.h" +#include "hwy/tests/test_util.h" // AdjustedReps +#include "hwy/timer.h" namespace gcpp { namespace { @@ -253,80 +260,150 @@ TEST(ThreadingTest, TestParallelizeTwoRanges) { } } -// Governs duration of test; avoid timeout in debug builds. -#if HWY_IS_DEBUG_BUILD -constexpr size_t kMaxEvals = 2; -#else -constexpr size_t kMaxEvals = 8; -#endif - static constexpr size_t kU64PerThread = HWY_ALIGNMENT / sizeof(size_t); static uint64_t outputs[hwy::kMaxLogicalProcessors * kU64PerThread]; -hwy::FuncOutput ForkJoin(const void* opaque, hwy::FuncInput in) { - hwy::ThreadPool& pool = - *reinterpret_cast(const_cast(opaque)); - pool.Run(0, in, [&](uint64_t task, size_t thread) { - outputs[thread * kU64PerThread] = in; - }); - return in; +std::vector MeasureForkJoin(hwy::ThreadPool& pool) { + // Governs duration of test; avoid timeout in debug builds. + const size_t max_reps = hwy::AdjustedReps(50 * 1000); + + std::vector times; + times.reserve(max_reps); + + const size_t base = hwy::Unpredictable1(); + + const double t0 = hwy::platform::Now(); + for (size_t reps = 0; reps < 1200; ++reps) { + pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { + outputs[thread * kU64PerThread] = base + thread; + }); + hwy::PreventElision(outputs[base]); + if (pool.AutoTuneComplete()) break; + } + const double t1 = hwy::platform::Now(); + +// TODO(janwas): enable after Highway update +#if 0 + if (pool.AutoTuneComplete()) { + hwy::Span cd = pool.AutoTuneCosts(); + std::vector costs; + costs.reserve(cd.size()); + double min_cost = hwy::HighestValue(); + for (size_t i = 0; i < cd.size(); ++i) { + costs.push_back(cd[i].EstimateCost()); + min_cost = HWY_MIN(min_cost, costs.back()); + } + // Harmonic mean = reciprocal of the arithmetic mean of the reciprocals. + double sum_recip_speedup = 0.0; + size_t count_sum = 0; + for (size_t i = 0; i < costs.size(); ++i) { + if (costs[i] == min_cost) continue; + const double speedup = costs[i] / min_cost; + HWY_ASSERT(speedup > 1.0); + sum_recip_speedup += 1.0 / speedup; + ++count_sum; + } + const double harm_mean_speedup = + static_cast(count_sum / sum_recip_speedup); + fprintf(stderr, "\nAuto-tuning took %f ms; harm. mean speedup: %f.\n", + (t1 - t0) * 1E3, harm_mean_speedup); + } else { + HWY_WARN("Auto-tuning did not complete yet."); + } +#else + (void)t0; + (void)t1; +#endif + + char cpu100[100]; + static const bool have_stop = hwy::platform::HaveTimerStop(cpu100); + if (have_stop) { + for (size_t rep = 0; rep < max_reps; ++rep) { + const uint64_t t0 = hwy::timer::Start(); + pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { + outputs[thread * kU64PerThread] = base + thread; + }); + const uint64_t t1 = hwy::timer::Stop(); + times.push_back(t1 - t0); + } + } else { + for (size_t rep = 0; rep < max_reps; ++rep) { + const uint64_t t0 = hwy::timer::Start(); + pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { + outputs[thread * kU64PerThread] = base + thread; + }); + const uint64_t t1 = hwy::timer::Start(); + times.push_back(t1 - t0); + } + } + return times; } TEST(ThreadingTest, BenchJoin) { - constexpr size_t kInputs = 1; - static hwy::FuncInput inputs[kInputs]; - const auto measure = [&](hwy::ThreadPool& pool, bool spin, const char* caption) { - inputs[0] = - static_cast(hwy::Unpredictable1() * pool.NumWorkers()); - hwy::Result results[kInputs]; - hwy::Params params; - params.verbose = false; - params.max_evals = kMaxEvals; - // Only spin for the duration of the benchmark to avoid wasting energy and // interfering with the other pools. if (spin) { pool.SetWaitMode(hwy::PoolWaitMode::kSpin); } - const size_t num_results = - Measure(&ForkJoin, reinterpret_cast(&pool), inputs, - kInputs, results, params); + std::vector times = MeasureForkJoin(pool); + // Capture before SetWaitMode changes it. + const hwy::pool::Config config = pool.config(); if (spin) { pool.SetWaitMode(hwy::PoolWaitMode::kBlock); } - for (size_t i = 0; i < num_results; ++i) { - printf("%-20s: %5d: %6.2f us; MAD=%4.2f%%\n", caption, - static_cast(results[i].input), - results[i].ticks / hwy::platform::InvariantTicksPerSecond() * 1E6, - results[i].variability * 100.0); + const uint64_t median = + hwy::robust_statistics::Median(times.data(), times.size()); + const uint64_t mode = + hwy::robust_statistics::ModeOfSorted(times.data(), times.size()); + // Trim upper half and lower quarter to reduce outliers before Stats. + const size_t quarter = times.size() / 4; + times.erase(times.begin() + 2 * quarter, times.end()); + times.erase(times.begin(), times.begin() + quarter); + + hwy::Stats stats; + for (uint64_t t : times) { + stats.Notify(static_cast(t)); } + fprintf(stderr, "%-20s: %3d: %6.2f %6.2f us %s\n", caption, + static_cast(pool.NumWorkers()), + median / hwy::platform::InvariantTicksPerSecond() * 1E6, + mode / hwy::platform::InvariantTicksPerSecond() * 1E6, + config.ToString().c_str()); + fprintf(stderr, "%s\n", stats.ToString().c_str()); + // Verify outputs to ensure the measured code is not a no-op. for (size_t lp = 0; lp < pool.NumWorkers(); ++lp) { - HWY_ASSERT(outputs[lp * kU64PerThread] == pool.NumWorkers()); + HWY_ASSERT(outputs[lp * kU64PerThread] >= 1); + HWY_ASSERT(outputs[lp * kU64PerThread] <= 1 + pool.NumWorkers()); for (size_t i = 1; i < kU64PerThread; ++i) { HWY_ASSERT(outputs[lp * kU64PerThread + i] == 0); } } }; - NestedPools pools(0); + BoundedTopology topology; + Allocator::Init(topology, true); + NestedPools pools(topology); + // Use last package because the main thread has been pinned to it. + const size_t pkg_idx = pools.NumPackages() - 1; + measure(pools.AllPackages(), false, "block packages"); - if (pools.AllClusters(0).NumWorkers() > 1) { - measure(pools.AllClusters(0), false, "block clusters"); + if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { + measure(pools.AllClusters(pkg_idx), false, "block clusters"); } - measure(pools.Cluster(0, 0), false, "block in_cluster"); + measure(pools.Cluster(pkg_idx, 0), false, "block in_cluster"); if (pools.AllPinned()) { const bool kSpin = true; measure(pools.AllPackages(), kSpin, "spin packages"); - if (pools.AllClusters(0).NumWorkers() > 1) { - measure(pools.AllClusters(0), kSpin, "spin clusters"); + if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { + measure(pools.AllClusters(pkg_idx), kSpin, "spin clusters"); } - measure(pools.Cluster(0, 0), kSpin, "spin in_cluster"); + measure(pools.Cluster(pkg_idx, 0), kSpin, "spin in_cluster"); } } diff --git a/util/topology.cc b/util/topology.cc new file mode 100644 index 0000000..239be4d --- /dev/null +++ b/util/topology.cc @@ -0,0 +1,336 @@ +// Copyright 2024 Google LLC +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "util/topology.h" + +#include + +#include // std::sort +#include // std::move +#include + +#include "hwy/base.h" + +namespace gcpp { + +// Sort T := packages/clusters by descending 'size' so that users who only use +// one Group get the largest. +template +static void SortByDescendingSize(std::vector& groups) { + std::sort(groups.begin(), groups.end(), + [](const T& a, const T& b) { return a.Size() > b.Size(); }); +} + +// Returns set of LPs available for use. +static LPS EnabledLPs(const BoundedSlice& lp_slice) { + LPS enabled_lps; + + // Thread-safe caching during the first call because subsequent pinning + // overwrites the main thread's affinity. + static const LPS affinity = []() { + LPS affinity; + if (!GetThreadAffinity(affinity)) affinity = LPS(); + return affinity; + }(); + if (HWY_LIKELY(affinity.Any())) { + // To honor taskset/numactl *and* the users's `lp_slice`, we interpret + // the latter as a slice of the 1-bits of `enabled_lps`. Note that this + // can be used to exclude hyperthreads because Linux groups LPs by + // sibling index. For example, the first `num_cores` are not siblings. + const size_t detected = affinity.Count(); + size_t enabled_idx = 0; + affinity.Foreach([&](size_t lp) { + if (lp_slice.Contains(detected, enabled_idx)) { + enabled_lps.Set(lp); + } + ++enabled_idx; + }); + } else { + const size_t num_lps = hwy::TotalLogicalProcessors(); + HWY_WARN("unknown OS affinity, max %zu LPs and slice %zu.", num_lps, + lp_slice.Num(num_lps)); + for (size_t lp = 0; lp < num_lps; ++lp) { + if (lp_slice.Contains(num_lps, lp)) { + enabled_lps.Set(lp); + } + } + } + + // Without threading support, only keep the first enabled LP; it might still + // make sense to pin the main thread to avoid migrations. + if (HWY_UNLIKELY(!hwy::HaveThreadingSupport())) { + HWY_ASSERT(enabled_lps.Any()); + const size_t lp = enabled_lps.First(); + enabled_lps = LPS(); + enabled_lps.Set(lp); + HWY_WARN("Warning, threads not supported, using only the main thread."); + } + + return enabled_lps; +} + +BoundedTopology::BoundedTopology(BoundedSlice package_slice, + BoundedSlice cluster_slice, + BoundedSlice lp_slice) { + const LPS enabled_lps = EnabledLPs(lp_slice); + +#if !GEMMA_DISABLE_TOPOLOGY + if (HWY_LIKELY(!topology_.packages.empty())) { + InitFromTopology(enabled_lps, package_slice, cluster_slice); + } +#endif + + // Topology unknown or no packages with enabled LPs: create a single + // package with one cluster, and one node. + if (HWY_UNLIKELY(NumPackages() == 0)) { + InitFromLPs(enabled_lps); + } + + HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0); +} + +// Topology is unknown, take the given set of LPs. +BoundedTopology::Cluster::Cluster(const LPS& lps) { + lps_ = lps; + num_workers_ = lps.Count(); +} + +BoundedTopology::Cluster::Cluster(const LPS& enabled_lps, + const std::vector& all_lps, + const hwy::Topology::Cluster& tcluster) { + bool is_first_lp = true; + + tcluster.lps.Foreach([&](size_t lp) { + // Skip if not first-hyperthread or disabled. + if (all_lps[lp].smt != 0 || !enabled_lps.Get(lp)) return; + + HWY_ASSERT(!lps_.Get(lp)); // Foreach ensures uniqueness + lps_.Set(lp); + ++num_workers_; + + // Set fields once, and ensure subsequent LPs match - we assume there + // is only one NUMA node per cluster, with the same L2/L3 size. + const size_t lp_node = static_cast(all_lps[lp].node); + if (is_first_lp) { + is_first_lp = false; + node_ = lp_node; + private_kib_ = tcluster.private_kib; + shared_kib_ = tcluster.shared_kib; + } else { + static bool warned = false; + if (HWY_LIKELY(!warned)) { + if (HWY_UNLIKELY(lp_node != node_)) { + warned = true; + HWY_WARN("lp %zu on node %zu != cluster node %zu.", lp, lp_node, + node_); + } + if (HWY_UNLIKELY(private_kib_ != tcluster.private_kib)) { + warned = true; + HWY_WARN("lp %zu private_kib %zu != cluster %zu.", lp, private_kib_, + tcluster.private_kib); + } + if (HWY_UNLIKELY(shared_kib_ != tcluster.shared_kib)) { + warned = true; + HWY_WARN("lp %zu shared_kib %zu != cluster %zu.", lp, shared_kib_, + tcluster.shared_kib); + } + } // !warned + } + }); +} + +// CPUs without clusters are rarely more than dozens of cores, and 6 is a +// decent number of threads in a per-cluster pool. +constexpr bool kSplitLargeClusters = false; +constexpr size_t kMaxClusters = 8; +constexpr size_t kMaxLPsPerCluster = 6; + +// Topology is unknown, use only the given LPs which derive from OS affinity +// and `lp_slice`. +BoundedTopology::Package::Package(const LPS& enabled_lps) { + LPS clusters_lps[kMaxClusters]; + const size_t num_clusters = + kSplitLargeClusters + ? HWY_MIN(kMaxClusters, + hwy::DivCeil(enabled_lps.Count(), kMaxLPsPerCluster)) + : 1; + + size_t enabled_idx = 0; + enabled_lps.Foreach([&](size_t lp) { + clusters_lps[enabled_idx % num_clusters].Set(lp); + ++enabled_idx; + }); + + for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { + clusters.push_back(Cluster(clusters_lps[cluster_idx])); + } +} + +// NOTE: caller is responsible for checking whether `clusters` is empty. +BoundedTopology::Package::Package(const LPS& enabled_lps, + const hwy::Topology& topology, size_t pkg_idx, + BoundedSlice cluster_slice) { + const hwy::Topology::Package& tpackage = topology.packages[pkg_idx]; + // Populate `clusters` with the subset of clusters in `cluster_slice` that + // have any enabled LPs. If `clusters` remains empty, the caller will + // skip this `Package`. + clusters.reserve(cluster_slice.Num(tpackage.clusters.size())); + cluster_slice.Foreach( + "cluster", tpackage.clusters.size(), [&](size_t cluster_idx) { + const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx]; + Cluster cluster(enabled_lps, topology.lps, tcluster); + + // Skip if empty, i.e. too few `enabled_lps`. + if (HWY_LIKELY(cluster.Size() != 0)) { + clusters.push_back(cluster); + } + }); + SortByDescendingSize(clusters); + + // If there is only one large cluster, split it into smaller ones. + if (kSplitLargeClusters && clusters.size() == 1 && + enabled_lps.Count() >= 16) { + const LPS lps = clusters[0].LPSet(); // copy so we can clear + clusters.clear(); + + // Split `lps` into several clusters. + LPS clusters_lps[kMaxClusters]; + const size_t num_clusters = + HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster)); + size_t num_lps = 0; + lps.Foreach( + [&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); }); + HWY_DASSERT(num_lps == lps.Count()); + + // Create new clusters, just inserting the new LPS. + hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy + for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { + tcluster.lps = clusters_lps[cluster_idx]; + // Keep same `private_kib` and `shared_kib`. + clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster)); + } + } +} + +#if !GEMMA_DISABLE_TOPOLOGY + +static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) { + LPS cores; + lps.Foreach([&](size_t lp) { + if (topology.lps[lp].smt == 0) cores.Set(lp); + }); + return cores.Count(); +} + +// Scans hwy::Topology for clusters and their size, for use by topology_string_. +static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters, + size_t& max_tcluster_cores, + size_t& max_tcluster_lps) { + max_tclusters = 0; + max_tcluster_cores = 0; + max_tcluster_lps = 0; + for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) { + const std::vector& tclusters = + topology_.packages[pkg_idx].clusters; + max_tclusters = HWY_MAX(max_tclusters, tclusters.size()); + size_t tcluster_cores = 0; + size_t tcluster_lps = 0; + for (size_t cluster_idx = 0; cluster_idx < tclusters.size(); + ++cluster_idx) { + const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_); + const size_t lps = tclusters[cluster_idx].lps.Count(); + tcluster_cores = HWY_MAX(tcluster_cores, cores); + tcluster_lps = HWY_MAX(tcluster_lps, lps); + } + + if (tclusters.size() > 1 && tcluster_cores > 8) { + HWY_WARN( + "Package %zu: multiple clusters with max size %zu, whereas CCX " + "only have 8, may indicate a bug in hwy::Topology.", + pkg_idx, tcluster_cores); + } + max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores); + max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps); + } + HWY_ASSERT(max_tclusters != 0); + HWY_ASSERT(max_tcluster_cores != 0); + HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores); +} + +// Main part of ctor, called when topology is known. +void BoundedTopology::InitFromTopology(const LPS& enabled_lps, + BoundedSlice package_slice, + BoundedSlice cluster_slice) { + size_t max_tclusters, max_tcluster_cores, max_tcluster_lps; + ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps); + + // (Possibly empty) subset of `Topology` packages that have `enabled_lps`. + package_slice.Foreach( + "package", topology_.packages.size(), [&](size_t pkg_idx) { + Package package(enabled_lps, topology_, pkg_idx, cluster_slice); + // Skip if empty, i.e. too few `enabled_lps`. + if (HWY_LIKELY(!package.clusters.empty())) { + packages_.push_back(std::move(package)); + } + }); + if (NumPackages() == 0) return; + SortByDescendingSize(packages_); + + // Remember NUMA nodes that we are actually using (not just enabled). + for (const Package& p : packages_) { + for (const Cluster& c : p.clusters) { + nodes_.Set(c.Node()); + } + } + + // Scan for max BoundedTopology clusters and their size, for topology_string_. + size_t all_max_cluster_size = 0; + for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) { + size_t max_cluster_size = 0; + for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx); + ++cluster_idx) { + max_cluster_size = + HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size()); + } + if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) { + HWY_WARN( + "Package %zu: multiple clusters with max size %zu, whereas CCX " + "only have 8, may indicate a bug in BoundedTopology.", + pkg_idx, max_cluster_size); + } + all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size); + } + + snprintf(topology_string_, sizeof(topology_string_), + "%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)", + topology_.packages.size(), max_tclusters, max_tcluster_cores, + max_tcluster_lps / max_tcluster_cores, packages_.size(), + NumClusters(0), all_max_cluster_size, nodes_.Count()); +} + +#endif // !GEMMA_DISABLE_TOPOLOGY + +void BoundedTopology::InitFromLPs(const LPS& enabled_lps) { + packages_.push_back(Package(enabled_lps)); + + snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu", + GetCluster(0, 0).Size()); + + // Assume a single NUMA node. + nodes_.Set(0); + HWY_ASSERT(NumNodes() == 1); +} + +} // namespace gcpp diff --git a/util/topology.h b/util/topology.h new file mode 100644 index 0000000..c721fd2 --- /dev/null +++ b/util/topology.h @@ -0,0 +1,177 @@ +// Copyright 2024 Google LLC +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_ +#define THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_ + +#include +#include + +#include + +// IWYU pragma: begin_exports +#include "hwy/base.h" // HWY_ASSERT +#include "hwy/contrib/thread_pool/topology.h" +// IWYU pragma: end_exports + +#ifndef GEMMA_DISABLE_TOPOLOGY +#define GEMMA_DISABLE_TOPOLOGY 0 +#endif // !GEMMA_DISABLE_TOPOLOGY + +namespace gcpp { + +// A slice of a 1D integer range such as the indices of packages or clusters. +// This allows assigning them to multiple instances of our binary. +class BoundedSlice { + public: + // Defaults to "use all detected". + BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {} + + size_t Begin() const { return skip_; } + + // STL-style one past the end. + size_t End(size_t detected) const { + return (max_ == 0) ? detected : HWY_MIN(detected, skip_ + max_); + } + + // Number of elements in the slice. + size_t Num(size_t detected) const { return End(detected) - Begin(); } + + bool Contains(size_t detected, size_t idx) const { + return Begin() <= idx && idx < End(detected); + } + + template + void Foreach(const char* name, size_t detected, const Func& func) { + if (Begin() >= detected) { + HWY_ABORT("Invalid skip=%zu for %s, detected=%zu", skip_, name, detected); + } + for (size_t i = Begin(); i < End(detected); ++i) { + func(i); + } + } + + private: + // How many to skip, or equivalently, index of the first to use. It is an + // error if this is >= `detected`, because that would leave none for this + // instance to use. + size_t skip_; + + // Upper bound on the number to use, or zero if no limit. + size_t max_; +}; + +// "LP" is a logical processor, a 0-based index passed to the OS. +using LPS = hwy::LogicalProcessorSet; + +// Wraps hwy::Topology and only keeps the subset of packages and clusters +// apportioned by BoundedSlice, further limited by the OS affinity mask. +// NOTE: if topology is unknown or the OS affinity is too restrictive, we fall +// back to a single package and cluster. +class BoundedTopology { + public: + // Defaults to "use all detected". + BoundedTopology(BoundedSlice package_slice = BoundedSlice(), + BoundedSlice cluster_slice = BoundedSlice(), + BoundedSlice lp_slice = BoundedSlice()); + + size_t NumPackages() const { return packages_.size(); } + size_t NumNodes() const { return nodes_.Count(); } + const char* TopologyString() const { return topology_string_; } + + class Cluster { + public: + Cluster(const LPS& lps); + Cluster(const LPS& enabled_lps, + const std::vector& all_lps, + const hwy::Topology::Cluster& tcluster); + + // For SortByDescendingSize. + size_t Size() const { return num_workers_; } + + // Returns vector with all enabled LPs, used for pinning. + std::vector LPVector() const { + std::vector lps; + lps.reserve(lps_.Count()); + lps_.Foreach([&lps](size_t lp) { lps.push_back(lp); }); + return lps; + } + + const LPS& LPSet() const { return lps_; } + size_t Node() const { return node_; } + size_t PrivateKiB() const { return private_kib_; } + size_t SharedKiB() const { return shared_kib_; } + + private: + // Enabled LPs; if topology is known, only the ones in this cluster. + LPS lps_; + // How many workers in the per-cluster pool. If 0, this Cluster is removed. + size_t num_workers_ = 0; + // NUMA node, set from hwy::Topology::LP::node. + size_t node_ = 0; + // L2 cache size in KiB, or 0 if unknown. + size_t private_kib_ = 0; + // L3 cache size in KiB, or 0 if unknown. + size_t shared_kib_ = 0; + }; // Cluster + + size_t NumClusters(size_t pkg_idx) const { + HWY_ASSERT(pkg_idx < NumPackages()); + return packages_[pkg_idx].clusters.size(); + } + const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const { + HWY_ASSERT(pkg_idx < NumPackages()); + const Package& package = packages_[pkg_idx]; + HWY_ASSERT(cluster_idx < package.clusters.size()); + return package.clusters[cluster_idx]; + } + Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) { + HWY_ASSERT(pkg_idx < NumPackages()); + Package& package = packages_[pkg_idx]; + HWY_ASSERT(cluster_idx < package.clusters.size()); + return package.clusters[cluster_idx]; + } + +#if !GEMMA_DISABLE_TOPOLOGY + const hwy::Topology& FullTopology() const { return topology_; } +#endif + + private: + struct Package { + explicit Package(const LPS& enabled_lps); + Package(const LPS& enabled_lps, const hwy::Topology& topology, + size_t pkg_idx, BoundedSlice cluster_slice); + + // For SortByDescendingSize. + size_t Size() const { return clusters.size(); } + + std::vector clusters; + }; // Package + + void InitFromTopology(const LPS& enabled_lps, BoundedSlice package_slice, + BoundedSlice cluster_slice); + void InitFromLPs(const LPS& enabled_lps); + +#if !GEMMA_DISABLE_TOPOLOGY + hwy::Topology topology_; +#endif + std::vector packages_; + char topology_string_[96]; + LPS nodes_; +}; + +} // namespace gcpp + +#endif // THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_