Refactor Gemma ctor and improve pool NUMA support

Gemma receives a MatMulEnv arg, with comment on lifetime
Split threading into topology so the latter can be used in allocator
Add AllocClasses() for non-POD (ThreadPool)
Support binding pool to NUMA node
Update threading_test with latency measurements
Also update Highway version.

PiperOrigin-RevId: 736904748
This commit is contained in:
Jan Wassenberg 2025-03-14 10:18:11 -07:00 committed by Copybara-Service
parent 1b1b63d560
commit 1b72c22345
31 changed files with 920 additions and 697 deletions

View File

@ -29,15 +29,13 @@ cc_library(
],
)
# Split from :threading to break a circular dependency with :allocator.
cc_library(
name = "threading",
srcs = ["util/threading.cc"],
hdrs = ["util/threading.h"],
name = "topology",
srcs = ["util/topology.cc"],
hdrs = ["util/topology.h"],
deps = [
":basics",
# Placeholder for container detection, do not remove
"@highway//:hwy",
"@highway//:thread_pool",
"@highway//:topology",
],
)
@ -48,9 +46,44 @@ cc_library(
hdrs = ["util/allocator.h"],
deps = [
":basics",
":threading",
":topology",
"@highway//:hwy",
"@highway//:thread_pool",
"@highway//:topology",
],
)
cc_library(
name = "threading",
srcs = ["util/threading.cc"],
hdrs = ["util/threading.h"],
deps = [
":allocator",
":basics",
":topology",
# Placeholder for container detection, do not remove
"@highway//:hwy",
"@highway//:thread_pool",
"@highway//:topology",
],
)
cc_test(
name = "threading_test",
srcs = ["util/threading_test.cc"],
deps = [
":allocator",
":basics",
":threading",
"@googletest//:gtest_main",
"@highway//:auto_tune",
"@highway//:hwy",
"@highway//:hwy_test_util",
"@highway//:nanobenchmark",
"@highway//:robust_statistics",
"@highway//:stats",
"@highway//:thread_pool",
"@highway//:timer",
],
)
@ -64,19 +97,6 @@ cc_library(
],
)
cc_test(
name = "threading_test",
srcs = ["util/threading_test.cc"],
deps = [
":threading",
"@googletest//:gtest_main",
"@highway//:hwy",
"@highway//:hwy_test_util",
"@highway//:nanobenchmark",
"@highway//:thread_pool",
],
)
# For building all tests in one command, so we can test several.
test_suite(
name = "ops_tests",
@ -104,6 +124,7 @@ cc_library(
":allocator",
":basics",
":threading",
":topology",
"//compression:compress",
"@highway//:algo",
"@highway//:bit_set",
@ -113,7 +134,6 @@ cc_library(
"@highway//:nanobenchmark",
"@highway//:profiler",
"@highway//:thread_pool",
"@highway//:topology",
"@highway//hwy/contrib/sort:vqsort",
],
)
@ -128,11 +148,11 @@ cc_test(
tags = ["ops_tests"],
deps = [
":allocator",
":app",
":ops",
":test_util",
":threading",
"@googletest//:gtest_main", # buildcleaner: keep
"//:app",
"//compression:compress",
"//compression:test_util",
"@highway//:hwy",
@ -154,11 +174,12 @@ cc_test(
tags = ["ops_tests"],
deps = [
":allocator",
":app",
":common",
":ops",
":test_util",
":threading",
"@googletest//:gtest_main", # buildcleaner: keep
"//:app",
"//compression:compress",
"@highway//:hwy",
"@highway//:hwy_test_util",
@ -405,6 +426,7 @@ cc_library(
":cross_entropy",
":gemma_lib",
":kv_cache",
":ops",
":threading",
# Placeholder for internal dep, do not remove.,
"@google_benchmark//:benchmark",
@ -464,13 +486,13 @@ cc_binary(
":benchmark_helper",
":common",
":gemma_lib",
":ops",
":threading",
# Placeholder for internal dep, do not remove.,
"//compression:sfp",
"//paligemma:image",
"@highway//:hwy",
"@highway//:profiler",
"@highway//:thread_pool",
],
)
@ -634,13 +656,12 @@ cc_test(
":backprop",
":backprop_scalar",
":common",
":gemma_lib",
":ops",
":prompt",
":sampler",
":threading",
":weights",
"@googletest//:gtest_main",
"//:threading",
"//compression:compress",
"@highway//:hwy",
"@highway//:hwy_test_util",

View File

@ -22,7 +22,7 @@ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995 EXCLUDE_FROM_ALL)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06 EXCLUDE_FROM_ALL)
FetchContent_MakeAvailable(highway)
## Note: absl needs to be installed by sentencepiece. This will only happen if
@ -108,6 +108,8 @@ set(SOURCES
util/test_util.h
util/threading.cc
util/threading.h
util/topology.cc
util/topology.h
)
if(NOT CMAKE_BUILD_TYPE)

View File

@ -18,7 +18,7 @@ bazel_dep(name = "google_benchmark", version = "1.8.5")
# Require a more recent version.
git_override(
module_name = "highway",
commit = "f2209b911c74019e85d0b7a7a2833c9a2e1b7995",
commit = "c5bebf84ad01edec97e336f5c97ca4e0df6b4d06",
remote = "https://github.com/google/highway",
)

View File

@ -35,7 +35,6 @@
#include "ops/ops.h"
#include "util/threading.h"
#include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
// clang-format off
#undef HWY_TARGET_INCLUDE
@ -59,9 +58,9 @@ void TestMatMulVJP() {
static const size_t kRows = 8;
static const size_t kCols = 64;
static const size_t kTokens = 5;
gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1),
BoundedSlice(0, 8));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8));
Allocator::Init(topology);
gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse);
std::mt19937 gen(42);
MatStorageT<float> weights("weights", kRows, kCols);
MatStorageT<float> x("x", kTokens, kCols);
@ -105,9 +104,9 @@ void TestMultiHeadMatMulVJP() {
static const size_t kCols = 16;
static const size_t kHeads = 4;
static const size_t kTokens = 3;
gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1),
BoundedSlice(0, 8));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8));
Allocator::Init(topology);
gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse);
std::mt19937 gen(42);
MatStorageT<float> weights("weights", kRows, kCols * kHeads);
MatStorageT<float> x("x", kTokens, kCols * kHeads);
@ -150,9 +149,9 @@ void TestMultiHeadMatMulVJP() {
void TestRMSNormVJP() {
static const size_t K = 2;
static const size_t N = 64;
gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1),
BoundedSlice(0, 8));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 8));
Allocator::Init(topology);
gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse);
std::mt19937 gen(42);
MatStorageT<float> weights("weights", N, 1);
MatStorageT<float> x("x", K, N);
@ -216,9 +215,9 @@ static ModelConfig TestConfig() {
void TestEndToEnd() {
std::mt19937 gen(42);
gcpp::NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1),
BoundedSlice(0, 1));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1));
Allocator::Init(topology);
gcpp::NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse);
ModelConfig config = TestConfig();
WeightsWrapper<float> weights(config);
WeightsWrapper<float> grad(config);

View File

@ -41,9 +41,10 @@
namespace gcpp {
TEST(OptimizeTest, GradientDescent) {
NestedPools pools(1, /*pin=*/Tristate::kFalse, BoundedSlice(0, 1),
BoundedSlice(0, 1));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1));
Allocator::Init(topology);
NestedPools pools(topology, 1, /*pin=*/Tristate::kFalse);
MatMulEnv env(topology, pools);
hwy::ThreadPool& pool = pools.Pool();
std::mt19937 gen(42);
@ -66,7 +67,7 @@ TEST(OptimizeTest, GradientDescent) {
config.layer_configs[0].qkv_dim,
config.layer_configs[0].post_qk == PostQKType::HalfRope);
Gemma gemma(GemmaTokenizer(), info, pools);
Gemma gemma(GemmaTokenizer(), info, env);
const auto generate = [&](const std::vector<int>& prompt) {
std::vector<int> reply;

View File

@ -202,8 +202,9 @@ void ReadAndCompareBlobs(const char* path1, const char* path2) {
if (!CompareKeys(reader1, reader2)) return;
// Single allocation, avoid initializing the memory.
NestedPools pools(0);
Allocator::Init(pools.Topology());
BoundedTopology topology;
Allocator::Init(topology);
NestedPools pools(topology);
const size_t total_bytes = TotalBytes(reader1) + TotalBytes(reader2);
BytePtr all_blobs = hwy::AllocateAligned<uint8_t>(total_bytes);
size_t pos = 0;

View File

@ -56,8 +56,9 @@ void InitGenerator(const InferenceArgs& inference, std::mt19937& gen) {
GemmaEnv::GemmaEnv(const LoaderArgs& loader, const InferenceArgs& inference,
const AppArgs& app)
: pools_(CreatePools(app)) {
Allocator::Init(pools_.Topology());
: topology_(CreateTopology(app)),
pools_(CreatePools(topology_, app)),
env_(topology_, pools_) {
InferenceArgs mutable_inference = inference;
AbortIfInvalidArgs(mutable_inference);
LoaderArgs mutable_loader = loader;
@ -66,7 +67,7 @@ GemmaEnv::GemmaEnv(const LoaderArgs& loader, const InferenceArgs& inference,
fprintf(stderr, "Skipping model load because: %s\n", err);
} else {
fprintf(stderr, "Loading model...\n");
model_ = AllocateGemma(mutable_loader, pools_);
model_ = AllocateGemma(mutable_loader, env_);
// Only allocate one for starters because GenerateBatch might not be called.
kv_caches_.resize(1);
kv_caches_[0] = KVCache::Create(model_->GetModelConfig(),
@ -236,7 +237,7 @@ std::string CacheString() {
}
void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app,
NestedPools& pools) {
const BoundedTopology& topology, NestedPools& pools) {
loader.Print(app.verbosity);
inference.Print(app.verbosity);
app.Print(app.verbosity);
@ -255,7 +256,7 @@ void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app,
"Compiled config : %s\n"
"Weight Type : %s\n"
"EmbedderInput Type : %s\n",
dt, cpu100, pools.TopologyString(), pools.PinString(),
dt, cpu100, topology.TopologyString(), pools.PinString(),
CacheString().c_str(), hwy::TargetName(hwy::DispatchedTarget()),
hwy::VectorBytes() * 8, CompiledConfig(),
StringFromType(loader.Info().weight), TypeName<EmbedderInputT>());

View File

@ -24,6 +24,7 @@
#include <vector>
#include "gemma/gemma.h"
#include "ops/matmul.h"
#include "util/app.h"
#include "util/threading.h"
#include "hwy/base.h"
@ -105,15 +106,12 @@ class GemmaEnv {
KVCache& MutableKVCache() { return kv_caches_[0]; }
private:
// Thread pool for running inference.
NestedPools pools_;
// Random number generator.
std::mt19937 gen_;
// The model to run inference on.
BoundedTopology topology_;
NestedPools pools_; // Thread pool.
MatMulEnv env_;
std::mt19937 gen_; // Random number generator.
std::unique_ptr<Gemma> model_;
// KV caches, same number as query batch.
std::vector<KVCache> kv_caches_;
// Runtime config for inference.
std::vector<KVCache> kv_caches_; // Same number as query batch.
RuntimeConfig runtime_config_;
};
@ -121,7 +119,7 @@ class GemmaEnv {
void LogSpeedStats(double time_start, size_t total_tokens);
void ShowConfig(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app,
NestedPools& pools);
const BoundedTopology& topology, NestedPools& pools);
void ShowHelp(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app);
} // namespace gcpp

View File

@ -13,7 +13,6 @@ cc_binary(
# Placeholder for internal dep, do not remove.,
"//:app",
"//:args",
"//:common",
"//:gemma_lib",
"//:threading",
"//:tokenizer",

View File

@ -17,7 +17,7 @@ project(hello_world)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
include(FetchContent)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06)
FetchContent_MakeAvailable(highway)
FetchContent_Declare(sentencepiece GIT_REPOSITORY https://github.com/google/sentencepiece GIT_TAG 53de76561cfc149d3c01037f0595669ad32a5e7c)
FetchContent_MakeAvailable(sentencepiece)

View File

@ -58,9 +58,10 @@ int main(int argc, char** argv) {
}
// Instantiate model and KV Cache
gcpp::NestedPools pools = gcpp::CreatePools(app);
gcpp::Allocator::Init(pools.Topology());
gcpp::Gemma model = gcpp::CreateGemma(loader, pools);
gcpp::BoundedTopology topology(gcpp::CreateTopology(app));
gcpp::NestedPools pools = gcpp::CreatePools(topology, app);
gcpp::MatMulEnv env(topology, pools);
gcpp::Gemma model = gcpp::CreateGemma(loader, env);
gcpp::KVCache kv_cache =
gcpp::KVCache::Create(model.GetModelConfig(),
inference.prefill_tbatch_size);

View File

@ -11,13 +11,11 @@ cc_library(
hdrs = ["gemma.hpp"],
deps = [
"//:app",
"//:args",
"//:common",
"//:gemma_lib",
"//:ops",
"//:threading",
"//:tokenizer",
"@highway//:hwy",
"@highway//:thread_pool",
],
)
@ -31,6 +29,7 @@ cc_binary(
"//:args",
"//:common",
"//:gemma_lib",
"//:ops",
"//:threading",
"//:tokenizer",
"@highway//:hwy",

View File

@ -17,7 +17,7 @@ project(simplified_gemma)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
include(FetchContent)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG f2209b911c74019e85d0b7a7a2833c9a2e1b7995)
FetchContent_Declare(highway GIT_REPOSITORY https://github.com/google/highway.git GIT_TAG c5bebf84ad01edec97e336f5c97ca4e0df6b4d06)
FetchContent_MakeAvailable(highway)
FetchContent_Declare(sentencepiece GIT_REPOSITORY https://github.com/google/sentencepiece GIT_TAG 53de76561cfc149d3c01037f0595669ad32a5e7c)
FetchContent_MakeAvailable(sentencepiece)

View File

@ -25,10 +25,10 @@
#include "third_party/gemma_cpp/gemma/gemma.h"
#include "third_party/gemma_cpp/gemma/tokenizer.h"
#include "third_party/gemma_cpp/ops/matmul.h"
#include "third_party/gemma_cpp/util/app.h" // LoaderArgs
#include "third_party/gemma_cpp/util/threading.h"
#include "third_party/highway/hwy/base.h"
#include "third_party/highway/hwy/contrib/thread_pool/thread_pool.h"
class SimplifiedGemma {
public:
@ -38,8 +38,10 @@ class SimplifiedGemma {
: loader_(loader),
inference_(inference),
app_(app),
pools_(gcpp::CreatePools(app_)),
model_(gcpp::CreateGemma(loader_, pools_)) {
topology_(gcpp::CreateTopology(app_)),
pools_(gcpp::CreatePools(topology_, app_)),
env_(topology_, pools_),
model_(gcpp::CreateGemma(loader_, env_)) {
Init();
}
@ -47,14 +49,14 @@ class SimplifiedGemma {
: loader_(argc, argv, /*validate=*/true),
inference_(argc, argv),
app_(argc, argv),
pools_(gcpp::CreatePools(app_)),
model_(gcpp::CreateGemma(loader_, pools_)) {
topology_(gcpp::CreateTopology(app_)),
pools_(gcpp::CreatePools(topology_, app_)),
env_(topology_, pools_),
model_(gcpp::CreateGemma(loader_, env_)) {
Init();
}
void Init() {
gcpp::Allocator::Init(pools_.Topology());
// Instantiate model and KV Cache
kv_cache_ = gcpp::KVCache::Create(model_.GetModelConfig(),
inference_.prefill_tbatch_size);
@ -106,7 +108,9 @@ class SimplifiedGemma {
gcpp::LoaderArgs loader_;
gcpp::InferenceArgs inference_;
gcpp::AppArgs app_;
gcpp::BoundedTopology topology_;
gcpp::NestedPools pools_;
gcpp::MatMulEnv env_;
gcpp::Gemma model_;
gcpp::KVCache kv_cache_;
std::mt19937 gen_;

View File

@ -34,29 +34,27 @@
#include "ops/ops-inl.h"
#include "paligemma/image.h"
#include "util/threading.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/highway.h"
namespace gcpp {
Gemma::Gemma(const Path& tokenizer_path, const Path& weights,
const ModelInfo& info, NestedPools& pools)
: env_(pools), tokenizer_(tokenizer_path) {
const ModelInfo& info, MatMulEnv& env)
: env_(env), tokenizer_(tokenizer_path) {
model_.Load(weights, info.model, info.weight, info.wrapping,
env_.parallel.Pools().Pool(0),
/*tokenizer_proto=*/nullptr);
}
Gemma::Gemma(const Path& weights, NestedPools& pools) : env_(pools) {
Gemma::Gemma(const Path& weights, MatMulEnv& env) : env_(env) {
std::string tokenizer_proto;
model_.Load(weights, Model::UNKNOWN, Type::kUnknown, PromptWrapping::GEMMA_IT,
env_.parallel.Pools().Pool(0), &tokenizer_proto);
tokenizer_.Deserialize(tokenizer_proto);
}
Gemma::Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info,
NestedPools& pools)
: env_(pools), tokenizer_(std::move(tokenizer)) {
Gemma::Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, MatMulEnv& env)
: env_(env), tokenizer_(std::move(tokenizer)) {
HWY_ASSERT(info.weight == Type::kF32);
model_.Allocate(info.model, info.weight, env_.parallel.Pools().Pool(0));
}

View File

@ -33,8 +33,6 @@
#include "paligemma/image.h"
#include "util/allocator.h" // RowVectorBatch
#include "util/basics.h" // TokenAndProb
#include "util/threading.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/timer.h"
// IWYU pragma: end_exports
#include "hwy/aligned_allocator.h" // Span
@ -198,12 +196,14 @@ struct TimingInfo {
class Gemma {
public:
// Reads old format weights file and tokenizer file.
// `env` must remain valid for the lifetime of this Gemma.
Gemma(const Path& tokenizer_path, const Path& weights, const ModelInfo& info,
NestedPools& pools);
MatMulEnv& env);
// Reads new format weights file that contains everything in a single file.
Gemma(const Path& weights, NestedPools& pools);
// `env` must remain valid for the lifetime of this Gemma.
Gemma(const Path& weights, MatMulEnv& env);
// Allocates weights, caller is responsible for filling them.
Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, NestedPools& pools);
Gemma(GemmaTokenizer&& tokenizer, const ModelInfo& info, MatMulEnv& env);
~Gemma();
const ModelConfig& GetModelConfig() const { return model_.Config(); }
@ -252,12 +252,8 @@ class Gemma {
void GenerateImageTokens(const RuntimeConfig& runtime_config,
const Image& image, ImageTokens& image_tokens);
void SetMatMulVerbosity(int verbosity) {
if (verbosity >= 2) env_.print_best = true;
}
private:
MatMulEnv env_;
MatMulEnv& env_;
GemmaTokenizer tokenizer_;
// Type-erased so that this can be defined in the header.

View File

@ -26,12 +26,12 @@
#include "evals/benchmark_helper.h"
#include "gemma/common.h"
#include "gemma/gemma.h" // Gemma
#include "ops/matmul.h" // MatMulEnv
#include "paligemma/image.h"
#include "util/app.h"
#include "util/args.h" // HasHelp
#include "util/threading.h"
#include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/highway.h"
#include "hwy/profiler.h"
@ -248,11 +248,11 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) {
// Note that num_threads is an upper bound; we also limit to the number of
// detected and enabled cores.
NestedPools pools = CreatePools(app);
Allocator::Init(pools.Topology());
Gemma model = CreateGemma(loader, pools);
model.SetMatMulVerbosity(app.verbosity);
const BoundedTopology topology = CreateTopology(app);
NestedPools pools = CreatePools(topology, app);
MatMulEnv env(topology, pools);
if (app.verbosity >= 2) env.print_best = true;
Gemma model = CreateGemma(loader, env);
KVCache kv_cache =
KVCache::Create(model.GetModelConfig(), inference.prefill_tbatch_size);
@ -279,7 +279,7 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) {
std::cout << "\033[2J\033[1;1H" // clear screen
<< kAsciiArtBanner << "\n\n";
ShowConfig(loader, inference, app, pools);
ShowConfig(loader, inference, app, topology, pools);
std::cout << "\n" << instructions << "\n";
}

View File

@ -212,7 +212,7 @@ void BenchAllMatMul() {
// Skip EMU128 (10x slower than SSE4 for SFP) and older x86.
if (HWY_TARGET == HWY_EMU128 || HWY_TARGET == HWY_SSSE3 ||
HWY_TARGET == HWY_SSE2) {
HWY_TARGET == HWY_SSE2 || HWY_TARGET == HWY_NEON_WITHOUT_AES) {
return;
}
@ -220,13 +220,13 @@ void BenchAllMatMul() {
const BoundedSlice package_slice; // all packages/sockets
const BoundedSlice cluster_slice; // all clusters/CCX
const BoundedSlice lp_slice; // default to all cores (per package).
NestedPools pools(max_threads, Tristate::kDefault, package_slice,
cluster_slice, lp_slice);
fprintf(stderr, "BenchAllMatMul %s %s\n", pools.TopologyString(),
const BoundedTopology topology(package_slice, cluster_slice, lp_slice);
Allocator::Init(topology, /*enable_bind=*/true);
NestedPools pools(topology, max_threads, Tristate::kDefault);
fprintf(stderr, "BenchAllMatMul %s %s\n", topology.TopologyString(),
pools.PinString());
Allocator::Init(pools.Topology(), /*enable_bind=*/true);
MatMulEnv env(pools);
MatMulEnv env(topology, pools);
for (size_t batch_size : {1, 4, 128, 512}) {
constexpr bool kAdd = false;

View File

@ -1000,8 +1000,9 @@ struct TestShortDotsT {
const size_t N = hn::Lanes(d);
const hn::ScalableTag<float> df; // for CallDot
NestedPools pools = CreatePools(AppArgs());
Allocator::Init(pools.Topology());
const AppArgs app;
BoundedTopology topology(CreateTopology(app));
NestedPools pools = CreatePools(topology, app);
CompressWorkingSet work;
std::mt19937 rng;
rng.seed(12345);
@ -1109,9 +1110,9 @@ void TestAllDot() {
constexpr size_t kReps = hn::AdjustedReps(40);
const size_t num = 24 * 1024;
NestedPools pools(kMaxWorkers - 1, /*pin=*/Tristate::kDefault,
BoundedSlice(0, 1), BoundedSlice(0, 1));
Allocator::Init(pools.Topology());
const BoundedTopology topology(BoundedSlice(0, 1), BoundedSlice(0, 1),
BoundedSlice());
NestedPools pools(topology, kMaxWorkers - 1, /*pin=*/Tristate::kDefault);
RowVectorBatch<float> a(Extents2D(kMaxWorkers, num));
RowVectorBatch<float> b(Extents2D(kMaxWorkers, num));
RowVectorBatch<double> bufs(Extents2D(kMaxWorkers, num));

View File

@ -411,7 +411,8 @@ IndexRangePartition MMParallel::RangesOfNP(size_t max_packages, size_t N,
NPMultiple(N, sizeof_TC, nr, num_packages));
}
MatMulEnv::MatMulEnv(NestedPools& pools) : parallel(pools), storage(parallel) {
MatMulEnv::MatMulEnv(const BoundedTopology& topology, NestedPools& pools)
: parallel(topology, pools), storage(parallel) {
// Ensure Allocator:Init was called.
HWY_ASSERT(Allocator::LineBytes() != 0 && Allocator::VectorBytes() != 0);

View File

@ -28,10 +28,10 @@
#include "util/allocator.h"
#include "util/basics.h"
#include "util/threading.h"
#include "util/topology.h"
#include "hwy/aligned_allocator.h" // Span
#include "hwy/base.h"
#include "hwy/bit_set.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/profiler.h"
// IWYU pragma: end_exports
@ -51,7 +51,9 @@ class MMParallel {
public:
static constexpr size_t kMaxPackages = 4;
MMParallel(NestedPools& pools) : pools_(pools) {
// Both references must outlive this object.
MMParallel(const BoundedTopology& topology, NestedPools& pools)
: topology_(topology), pools_(pools) {
HWY_DASSERT(pools_.NumPackages() <= kMaxPackages);
}
@ -64,7 +66,7 @@ class MMParallel {
// For `BindB` and `BindC`.
size_t Node(size_t pkg_idx) const {
return pools_.Topology().GetCluster(pkg_idx, 0).Node();
return topology_.GetCluster(pkg_idx, 0).Node();
}
// Calls `func(pkg_idx)` for each package in parallel.
@ -167,6 +169,7 @@ class MMParallel {
}
private:
const BoundedTopology& topology_;
NestedPools& pools_;
};
@ -603,7 +606,7 @@ struct MMPerKey {
// Stores state shared across MatMul calls. Non-copyable.
struct MatMulEnv {
explicit MatMulEnv(NestedPools& pools);
explicit MatMulEnv(const BoundedTopology& topology, NestedPools& pools);
bool have_timer_stop = false;

View File

@ -312,22 +312,22 @@ void TestTiny() {
if (HWY_TARGET != first_target) return;
for (size_t max_packages : {1, 2}) {
const BoundedTopology topology(BoundedSlice(0, max_packages));
Allocator::Init(topology, /*enable_bind=*/true);
const size_t max_threads = 0; // no limit
NestedPools pools(max_threads, Tristate::kDefault,
BoundedSlice(0, max_packages));
NestedPools pools(topology, max_threads, Tristate::kDefault);
#if GEMMA_DISABLE_TOPOLOGY
if (max_packages == 2) break; // we only have one package
#else
// If less than the limit, we have already tested all num_packages.
if (pools.Topology().FullTopology().packages.size() < max_packages) break;
if (topology.FullTopology().packages.size() < max_packages) break;
#endif
fprintf(stderr, "TestTiny %zu: %s %s\n", max_packages,
pools.TopologyString(), pools.PinString());
topology.TopologyString(), pools.PinString());
Tristate use_spinning = Tristate::kDefault;
pools.MaybeStartSpinning(use_spinning);
Allocator::Init(pools.Topology(), /*enable_bind=*/true);
MatMulEnv env(pools);
MatMulEnv env(topology, pools);
for (size_t M = 1; M <= 12; ++M) {
for (size_t K = 1; K <= 64; K *= 2) {
@ -347,11 +347,12 @@ void TestAllMatMul() {
return;
}
NestedPools pools(0); // no limits
const BoundedTopology topology;
Allocator::Init(topology, /*enable_bind=*/true);
NestedPools pools(topology);
Tristate use_spinning = Tristate::kDefault;
pools.MaybeStartSpinning(use_spinning);
Allocator::Init(pools.Topology(), /*enable_bind=*/true);
MatMulEnv env(pools);
MatMulEnv env(topology, pools);
// Sizes seen in gemma_test 2B. Too slow for CI, enable on-demand.
TestMatMul<F32>(1, 2048, 512, /*add=*/false, env, __LINE__);

View File

@ -37,6 +37,7 @@
#include "util/allocator.h"
#include "util/app.h"
#include "util/test_util.h"
#include "util/threading.h"
#include "hwy/base.h"
#include "hwy/tests/hwy_gtest.h"
@ -387,8 +388,9 @@ static HWY_NOINLINE HWY_MAYBE_UNUSED void ScalarRopeAndMulBy(
}
void TestRopeAndMulBy() {
NestedPools pools = CreatePools(AppArgs());
Allocator::Init(pools.Topology());
AppArgs app;
BoundedTopology topology = CreateTopology(app);
NestedPools pools = CreatePools(topology, app);
ModelConfig config = ConfigFromModel(Model::GEMMA2_9B);
int dim_qkv = config.layer_configs[0].qkv_dim;

View File

@ -17,12 +17,11 @@
#include <stdio.h>
#include <atomic>
#include <vector>
#include "util/basics.h" // MaybeCheckInitialized
#include "hwy/aligned_allocator.h"
#include "hwy/base.h"
#include "hwy/contrib/thread_pool/futex.h"
#include "hwy/contrib/thread_pool/topology.h"
#include "hwy/per_target.h" // VectorBytes
// To avoid a dependency on libnuma, use syscalls directly. We require six
@ -52,6 +51,7 @@
#include <sys/syscall.h>
#include <cerrno>
#include <vector>
#endif // GEMMA_BIND && HWY_OS_LINUX
namespace gcpp {
@ -166,7 +166,7 @@ Allocator::PtrAndDeleter Allocator::AllocBytes(size_t bytes) {
auto call_free = [](void* ptr, size_t /*bytes*/) {
hwy::FreeAlignedBytes(ptr, nullptr, nullptr);
};
return PtrAndDeleter{p.release(), Deleter(call_free, bytes)};
return PtrAndDeleter{p.release(), DeleterFree(call_free, bytes)};
}
// Binding, or large vector/cache line size: use platform-specific allocator.
@ -186,14 +186,14 @@ Allocator::PtrAndDeleter Allocator::AllocBytes(size_t bytes) {
const int ret = munmap(ptr, bytes);
HWY_ASSERT(ret == 0);
};
return PtrAndDeleter{p, Deleter(call_munmap, bytes)};
return PtrAndDeleter{p, DeleterFree(call_munmap, bytes)};
#elif HWY_OS_WIN
const auto call_free = [](void* ptr, size_t) { _aligned_free(ptr); };
const size_t alignment = HWY_MAX(vector_bytes_, line_bytes_);
return PtrAndDeleter{_aligned_malloc(bytes, alignment),
Deleter(call_free, bytes)};
DeleterFree(call_free, bytes)};
#else
return PtrAndDeleter{nullptr, Deleter(nullptr, 0)};
return PtrAndDeleter{nullptr, DeleterFree(nullptr, 0)};
#endif
}
@ -288,7 +288,7 @@ bool Allocator::BindMemory(void* ptr, size_t bytes, size_t node) {
CountBusyPages(num_pages, node, pages.data(), status.data());
if (HWY_UNLIKELY(num_busy != 0)) {
// Trying again is usually enough to succeed.
usleep(5); // NOLINT(runtime/sleep)
hwy::NanoSleep(5000);
(void)SyscallWrappers::move_pages(
/*pid=*/0, num_pages, pages.data(), nodes.data(), status.data(), flags);
const size_t still_busy =

View File

@ -22,12 +22,11 @@
#include <stdint.h>
// IWYU pragma: begin_exports
#include <memory>
#include <memory> // std::unique_ptr
#include "util/basics.h"
#include "util/threading.h"
#include "util/topology.h"
#include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
// IWYU pragma: end_exports
#include "hwy/aligned_allocator.h"
@ -38,12 +37,12 @@ namespace gcpp {
// `bytes` argument is required for the latter.
using FreeFunc = void (*)(void* mem, size_t bytes);
// Custom deleter for std::unique_ptr that calls `FreeFunc`.
class Deleter {
// Custom deleter for std::unique_ptr that calls `FreeFunc`. T is POD.
class DeleterFree {
public:
// `MatStorageT` requires this to be default-constructible.
Deleter() : free_func_(nullptr), bytes_(0) {}
Deleter(FreeFunc free_func, size_t bytes)
DeleterFree() : free_func_(nullptr), bytes_(0) {}
DeleterFree(FreeFunc free_func, size_t bytes)
: free_func_(free_func), bytes_(bytes) {}
template <typename T>
@ -56,9 +55,31 @@ class Deleter {
size_t bytes_;
};
// Wrapper that also calls the destructor for non-POD T.
class DeleterDtor {
public:
DeleterDtor() {}
DeleterDtor(size_t num, DeleterFree free) : num_(num), free_(free) {}
template <typename T>
void operator()(T* p) const {
for (size_t i = 0; i < num_; ++i) {
p[i].~T();
}
free_(p);
}
private:
size_t num_; // not the same as free_.bytes_ / sizeof(T)!
DeleterFree free_;
};
// Unique (move-only) pointer to an aligned array of POD T.
template <typename T>
using AlignedPtr = std::unique_ptr<T[], Deleter>;
using AlignedPtr = std::unique_ptr<T[], DeleterFree>;
// Unique (move-only) pointer to an aligned array of non-POD T.
template <typename T>
using AlignedClassPtr = std::unique_ptr<T[], DeleterDtor>;
// Both allocation, binding, and row accessors depend on the sizes of memory
// pages and cache lines. To avoid having to pass `Allocator&` everywhere, we
@ -105,6 +126,26 @@ class Allocator {
return AlignedPtr<T>(static_cast<T*>(pd.p), pd.deleter);
}
// Same as Alloc, but calls constructor(s) with `args`.
template <typename T, class... Args>
static AlignedClassPtr<T> AllocClasses(size_t num, Args&&... args) {
constexpr size_t kSize = sizeof(T);
constexpr bool kIsPow2 = (kSize & (kSize - 1)) == 0;
constexpr size_t kBits = hwy::detail::ShiftCount(kSize);
static_assert(!kIsPow2 || (1ull << kBits) == kSize, "ShiftCount has a bug");
const size_t bytes = kIsPow2 ? num << kBits : num * kSize;
// Fail if the `bytes = num * kSize` computation overflowed.
const size_t check = kIsPow2 ? bytes >> kBits : bytes / kSize;
if (check != num) return AlignedClassPtr<T>();
PtrAndDeleter pd = AllocBytes(bytes);
T* p = static_cast<T*>(pd.p);
for (size_t i = 0; i < num; ++i) {
new (p + i) T(std::forward<Args>(args)...);
}
return AlignedClassPtr<T>(p, DeleterDtor(num, pd.deleter));
}
// Returns whether `BindMemory` can/should be called, i.e. we have page-level
// control over memory placement and multiple packages and NUMA nodes.
static bool ShouldBind();
@ -119,7 +160,7 @@ class Allocator {
// Type-erased so this can be implemented in allocator.cc.
struct PtrAndDeleter {
void* p;
Deleter deleter;
DeleterFree deleter;
};
static PtrAndDeleter AllocBytes(size_t bytes);
};

View File

@ -117,12 +117,15 @@ class AppArgs : public ArgsBase<AppArgs> {
}
};
// Callers must call Allocator::Init(pools.Topology()) after this.
static inline NestedPools CreatePools(const AppArgs& app) {
return NestedPools(app.max_threads, app.pin,
BoundedSlice(app.skip_packages, app.max_packages),
BoundedSlice(app.skip_clusters, app.max_clusters),
BoundedSlice(app.skip_lps, app.max_lps));
static inline BoundedTopology CreateTopology(const AppArgs& app) {
return BoundedTopology(BoundedSlice(app.skip_packages, app.max_packages),
BoundedSlice(app.skip_clusters, app.max_clusters),
BoundedSlice(app.skip_lps, app.max_lps));
}
static inline NestedPools CreatePools(const BoundedTopology& topology,
const AppArgs& app) {
Allocator::Init(topology);
return NestedPools(topology, app.max_threads, app.pin);
}
struct LoaderArgs : public ArgsBase<LoaderArgs> {
@ -224,24 +227,26 @@ struct LoaderArgs : public ArgsBase<LoaderArgs> {
ModelInfo info_;
};
static inline Gemma CreateGemma(const LoaderArgs& loader, NestedPools& pools) {
// `env` must remain valid for the lifetime of the Gemma.
static inline Gemma CreateGemma(const LoaderArgs& loader, MatMulEnv& env) {
if (Type::kUnknown == loader.Info().weight ||
Model::UNKNOWN == loader.Info().model || loader.tokenizer.path.empty()) {
// New weights file format doesn't need tokenizer path or model/weightinfo.
return Gemma(loader.weights, pools);
return Gemma(loader.weights, env);
}
return Gemma(loader.tokenizer, loader.weights, loader.Info(), pools);
return Gemma(loader.tokenizer, loader.weights, loader.Info(), env);
}
// `env` must remain valid for the lifetime of the Gemma.
static inline std::unique_ptr<Gemma> AllocateGemma(const LoaderArgs& loader,
NestedPools& pools) {
MatMulEnv& env) {
if (Type::kUnknown == loader.Info().weight ||
Model::UNKNOWN == loader.Info().model || loader.tokenizer.path.empty()) {
// New weights file format doesn't need tokenizer path or model/weight info.
return std::make_unique<Gemma>(loader.weights, pools);
return std::make_unique<Gemma>(loader.weights, env);
}
return std::make_unique<Gemma>(loader.tokenizer, loader.weights,
loader.Info(), pools);
loader.Info(), env);
}
struct InferenceArgs : public ArgsBase<InferenceArgs> {

View File

@ -19,8 +19,7 @@
#include <algorithm> // std::sort
#include <atomic>
#include <memory> // std::make_unique
#include <utility> // std::move
#include <optional>
#include <vector>
// Placeholder for container detection, do not remove
@ -45,50 +44,6 @@ class Pinning {
return false; }
public:
// Returns set of LPs available for use. Cached during the first call
// because subsequent pinning overwrites the main thread's affinity.
// Thread-hostile, not called concurrently.
LPS EnabledLPs(const BoundedSlice& lp_slice) {
if (enabled_lps_.Any()) return enabled_lps_;
LPS affinity;
if (HWY_LIKELY(GetThreadAffinity(affinity))) {
// To honor taskset/numactl *and* the users's `lp_slice`, we interpret
// the latter as a slice of the 1-bits of `enabled_lps`. Note that this
// can be used to exclude hyperthreads because Linux groups LPs by
// sibling index. For example, the first `num_cores` are not siblings.
const size_t detected = affinity.Count();
size_t enabled_idx = 0;
affinity.Foreach([&](size_t lp) {
if (lp_slice.Contains(detected, enabled_idx)) {
enabled_lps_.Set(lp);
}
++enabled_idx;
});
} else {
const size_t num_lps = hwy::TotalLogicalProcessors();
HWY_WARN("unknown OS affinity, max %zu LPs and slice %zu.", num_lps,
lp_slice.Num(num_lps));
for (size_t lp = 0; lp < num_lps; ++lp) {
if (lp_slice.Contains(num_lps, lp)) {
enabled_lps_.Set(lp);
}
}
}
// Without threading support, only keep the first enabled LP; it might still
// make sense to pin the main thread to avoid migrations.
if (HWY_UNLIKELY(!hwy::HaveThreadingSupport())) {
HWY_ASSERT(enabled_lps_.Any());
const size_t lp = enabled_lps_.First();
enabled_lps_ = LPS();
enabled_lps_.Set(lp);
HWY_WARN("Warning, threads not supported, using only the main thread.");
}
return enabled_lps_;
}
void SetPolicy(Tristate pin) {
if (pin == Tristate::kDefault) {
// Pinning is unreliable inside containers because the hypervisor might
@ -103,23 +58,25 @@ class Pinning {
// If want_pin_, tries to pin each worker in `pool` to an LP in `cluster`,
// and sets `any_error_` if any fails.
void MaybePin(size_t pkg_idx, size_t cluster_idx,
const BoundedTopology::Cluster& cluster, PoolPtr& pool) {
const BoundedTopology::Cluster& cluster,
hwy::ThreadPool& pool) {
const std::vector<size_t> lps = cluster.LPVector();
HWY_ASSERT(pool->NumWorkers() <= lps.size());
pool->Run(0, pool->NumWorkers(), [&](uint64_t task, size_t thread) {
HWY_ASSERT(pool.NumWorkers() <= lps.size());
pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) {
HWY_ASSERT(task == thread); // each worker has one task
char buf[16]; // Linux limitation
const int bytes_written = snprintf(buf, sizeof(buf), "P%zu X%02zu C%03zu",
pkg_idx, cluster_idx, task);
const int bytes_written =
snprintf(buf, sizeof(buf), "P%zu X%02zu C%03d", pkg_idx, cluster_idx,
static_cast<int>(task));
HWY_ASSERT(bytes_written < sizeof(buf));
hwy::SetThreadName(buf, 0); // does not support varargs
if (HWY_LIKELY(want_pin_)) {
if (HWY_UNLIKELY(!hwy::PinThreadToLogicalProcessor(lps[task]))) {
fprintf(stderr,
"Pinning failed for task %zu of %zu to %zu (size %zu)\n",
task, pool->NumWorkers(), lps[task], lps.size());
fprintf(
stderr, "Pinning failed for task %d of %zu to %zu (size %zu)\n",
static_cast<int>(task), pool.NumWorkers(), lps[task], lps.size());
(void)any_error_.test_and_set();
}
}
@ -141,7 +98,6 @@ class Pinning {
private:
std::atomic_flag any_error_ = ATOMIC_FLAG_INIT;
bool want_pin_; // set in SetPolicy
LPS enabled_lps_;
}; // Pinning
// Singleton saves global affinity across all BoundedTopology instances because
@ -151,263 +107,18 @@ static Pinning& GetPinning() {
return pinning;
}
BoundedTopology::BoundedTopology(BoundedSlice package_slice,
BoundedSlice cluster_slice,
BoundedSlice lp_slice) {
const LPS enabled_lps = GetPinning().EnabledLPs(lp_slice);
#if !GEMMA_DISABLE_TOPOLOGY
if (HWY_LIKELY(!topology_.packages.empty())) {
InitFromTopology(enabled_lps, package_slice, cluster_slice);
}
#endif
// Topology unknown or no packages with enabled LPs: create a single
// package with one cluster, and one node.
if (HWY_UNLIKELY(NumPackages() == 0)) {
InitFromLPs(enabled_lps);
}
HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0);
}
// Topology is unknown, take the given set of LPs.
BoundedTopology::Cluster::Cluster(const LPS& lps) {
lps_ = lps;
num_workers_ = lps.Count();
}
BoundedTopology::Cluster::Cluster(const LPS& enabled_lps,
const std::vector<hwy::Topology::LP>& all_lps,
const hwy::Topology::Cluster& tcluster) {
bool is_first_lp = true;
tcluster.lps.Foreach([&](size_t lp) {
// Skip if not first-hyperthread or disabled.
if (all_lps[lp].smt != 0 || !enabled_lps.Get(lp)) return;
HWY_ASSERT(!lps_.Get(lp)); // Foreach ensures uniqueness
lps_.Set(lp);
++num_workers_;
// Set fields once, and ensure subsequent LPs match - we assume there
// is only one NUMA node per cluster, with the same L2/L3 size.
const size_t lp_node = static_cast<size_t>(all_lps[lp].node);
if (is_first_lp) {
is_first_lp = false;
node_ = lp_node;
private_kib_ = tcluster.private_kib;
shared_kib_ = tcluster.shared_kib;
} else {
static bool warned = false;
if (HWY_LIKELY(!warned)) {
if (HWY_UNLIKELY(lp_node != node_)) {
warned = true;
HWY_WARN("lp %zu on node %zu != cluster node %zu.", lp, lp_node,
node_);
}
if (HWY_UNLIKELY(private_kib_ != tcluster.private_kib)) {
warned = true;
HWY_WARN("lp %zu private_kib %zu != cluster %zu.", lp, private_kib_,
tcluster.private_kib);
}
if (HWY_UNLIKELY(shared_kib_ != tcluster.shared_kib)) {
warned = true;
HWY_WARN("lp %zu shared_kib %zu != cluster %zu.", lp, shared_kib_,
tcluster.shared_kib);
}
} // !warned
}
});
}
// CPUs without clusters are rarely more than dozens of cores, and 6 is a
// decent number of threads in a per-cluster pool.
constexpr bool kSplitLargeClusters = false;
constexpr size_t kMaxClusters = 8;
constexpr size_t kMaxLPsPerCluster = 6;
// Topology is unknown, use only the given LPs which derive from OS affinity
// and `lp_slice`.
BoundedTopology::Package::Package(const LPS& enabled_lps) {
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
kSplitLargeClusters
? HWY_MIN(kMaxClusters,
hwy::DivCeil(enabled_lps.Count(), kMaxLPsPerCluster))
: 1;
size_t enabled_idx = 0;
enabled_lps.Foreach([&](size_t lp) {
clusters_lps[enabled_idx % num_clusters].Set(lp);
++enabled_idx;
});
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
clusters.push_back(Cluster(clusters_lps[cluster_idx]));
}
}
// NOTE: caller is responsible for checking whether `clusters` is empty.
BoundedTopology::Package::Package(const LPS& enabled_lps,
const hwy::Topology& topology, size_t pkg_idx,
BoundedSlice cluster_slice) {
const hwy::Topology::Package& tpackage = topology.packages[pkg_idx];
// Populate `clusters` with the subset of clusters in `cluster_slice` that
// have any enabled LPs. If `clusters` remains empty, the caller will
// skip this `Package`.
clusters.reserve(cluster_slice.Num(tpackage.clusters.size()));
cluster_slice.Foreach(
"cluster", tpackage.clusters.size(), [&](size_t cluster_idx) {
const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx];
Cluster cluster(enabled_lps, topology.lps, tcluster);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(cluster.Size() != 0)) {
clusters.push_back(cluster);
}
});
SortByDescendingSize(clusters);
// If there is only one large cluster, split it into smaller ones.
if (kSplitLargeClusters && clusters.size() == 1 &&
enabled_lps.Count() >= 16) {
const LPS lps = clusters[0].LPSet(); // copy so we can clear
clusters.clear();
// Split `lps` into several clusters.
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster));
size_t num_lps = 0;
lps.Foreach(
[&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); });
HWY_DASSERT(num_lps == lps.Count());
// Create new clusters, just inserting the new LPS.
hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
tcluster.lps = clusters_lps[cluster_idx];
// Keep same `private_kib` and `shared_kib`.
clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster));
}
}
}
#if !GEMMA_DISABLE_TOPOLOGY
static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) {
LPS cores;
lps.Foreach([&](size_t lp) {
if (topology.lps[lp].smt == 0) cores.Set(lp);
});
return cores.Count();
}
// Scans hwy::Topology for clusters and their size, for use by topology_string_.
static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters,
size_t& max_tcluster_cores,
size_t& max_tcluster_lps) {
max_tclusters = 0;
max_tcluster_cores = 0;
max_tcluster_lps = 0;
for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) {
const std::vector<hwy::Topology::Cluster>& tclusters =
topology_.packages[pkg_idx].clusters;
max_tclusters = HWY_MAX(max_tclusters, tclusters.size());
size_t tcluster_cores = 0;
size_t tcluster_lps = 0;
for (size_t cluster_idx = 0; cluster_idx < tclusters.size();
++cluster_idx) {
const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_);
const size_t lps = tclusters[cluster_idx].lps.Count();
tcluster_cores = HWY_MAX(tcluster_cores, cores);
tcluster_lps = HWY_MAX(tcluster_lps, lps);
}
if (tclusters.size() > 1 && tcluster_cores > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in hwy::Topology.",
pkg_idx, tcluster_cores);
}
max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores);
max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps);
}
HWY_ASSERT(max_tclusters != 0);
HWY_ASSERT(max_tcluster_cores != 0);
HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores);
}
// Main part of ctor, called when topology is known.
void BoundedTopology::InitFromTopology(const LPS& enabled_lps,
BoundedSlice package_slice,
BoundedSlice cluster_slice) {
size_t max_tclusters, max_tcluster_cores, max_tcluster_lps;
ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps);
// (Possibly empty) subset of `Topology` packages that have `enabled_lps`.
package_slice.Foreach(
"package", topology_.packages.size(), [&](size_t pkg_idx) {
Package package(enabled_lps, topology_, pkg_idx, cluster_slice);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(!package.clusters.empty())) {
packages_.push_back(std::move(package));
}
});
if (NumPackages() == 0) return;
SortByDescendingSize(packages_);
// Remember NUMA nodes that we are actually using (not just enabled).
for (const Package& p : packages_) {
for (const Cluster& c : p.clusters) {
nodes_.Set(c.Node());
}
}
// Scan for max BoundedTopology clusters and their size, for topology_string_.
size_t all_max_cluster_size = 0;
for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) {
size_t max_cluster_size = 0;
for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx);
++cluster_idx) {
max_cluster_size =
HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size());
}
if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in BoundedTopology.",
pkg_idx, max_cluster_size);
}
all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size);
}
snprintf(topology_string_, sizeof(topology_string_),
"%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)",
topology_.packages.size(), max_tclusters, max_tcluster_cores,
max_tcluster_lps / max_tcluster_cores, packages_.size(),
NumClusters(0), all_max_cluster_size, nodes_.Count());
}
#endif // !GEMMA_DISABLE_TOPOLOGY
void BoundedTopology::InitFromLPs(const LPS& enabled_lps) {
packages_.push_back(Package(enabled_lps));
snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu",
GetCluster(0, 0).Size());
// Assume a single NUMA node.
nodes_.Set(0);
HWY_ASSERT(NumNodes() == 1);
}
static PoolPtr MakePool(size_t num_workers) {
static PoolPtr MakePool(size_t num_workers,
std::optional<size_t> node = std::nullopt) {
// `ThreadPool` expects the number of threads to create, which is one less
// than the number of workers, but avoid underflow if zero.
const size_t num_threads = num_workers == 0 ? 0 : num_workers - 1;
return std::make_unique<hwy::ThreadPool>(num_threads);
PoolPtr ptr = Allocator::AllocClasses<hwy::ThreadPool>(1, num_threads);
const size_t bytes =
hwy::RoundUpTo(sizeof(hwy::ThreadPool), Allocator::QuantumBytes());
if (node.has_value() && Allocator::ShouldBind()) {
Allocator::BindMemory(ptr.get(), bytes, node.value());
}
return ptr;
}
// Used to divide max_threads and max_workers_per_package across packages and
@ -422,23 +133,21 @@ static size_t DivideMaxAcross(const size_t max, const size_t instances) {
return max;
}
NestedPools::NestedPools(size_t max_threads, Tristate pin,
BoundedSlice package_slice, BoundedSlice cluster_slice,
BoundedSlice lp_slice)
: topology_(package_slice, cluster_slice, lp_slice) {
NestedPools::NestedPools(const BoundedTopology& topology, size_t max_threads,
Tristate pin) {
GetPinning().SetPolicy(pin);
packages_.resize(topology_.NumPackages());
packages_.resize(topology.NumPackages());
all_packages_ = MakePool(packages_.size());
const size_t max_workers_per_package =
DivideMaxAcross(max_threads, packages_.size());
// Each worker in all_packages_, including the main thread, will be the
// calling thread of an all_clusters->Run, and hence pinned to one of the
// calling thread of an all_clusters[0].Run, and hence pinned to one of the
// `cluster.lps` if `pin`.
all_packages_->Run(
0, all_packages_->NumWorkers(), [&](uint64_t pkg_idx, size_t thread) {
all_packages_[0].Run(
0, packages_.size(), [&](uint64_t pkg_idx, size_t thread) {
HWY_ASSERT(pkg_idx == thread); // each thread has one task
packages_[pkg_idx] =
Package(topology_, pkg_idx, max_workers_per_package);
Package(topology, pkg_idx, max_workers_per_package);
});
all_pinned_ = GetPinning().AllPinned(&pin_string_);
@ -458,6 +167,11 @@ NestedPools::NestedPools(size_t max_threads, Tristate pin,
HWY_ASSERT(max_workers_per_cluster_ <= 256);
}
// `max_or_zero` == 0 means no limit.
static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) {
return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero);
}
NestedPools::Package::Package(const BoundedTopology& topology, size_t pkg_idx,
size_t max_workers_per_package) {
// Pre-allocate because elements are set concurrently.
@ -465,19 +179,21 @@ NestedPools::Package::Package(const BoundedTopology& topology, size_t pkg_idx,
const size_t max_workers_per_cluster =
DivideMaxAcross(max_workers_per_package, clusters_.size());
all_clusters_ = MakePool(clusters_.size());
all_clusters_ =
MakePool(clusters_.size(), topology.GetCluster(pkg_idx, 0).Node());
// Parallel so we also pin the calling worker in `all_clusters` to
// `cluster.lps`.
all_clusters_->Run(
0, all_clusters_->NumWorkers(), [&](size_t cluster_idx, size_t thread) {
all_clusters_[0].Run(
0, all_clusters_[0].NumWorkers(), [&](size_t cluster_idx, size_t thread) {
HWY_ASSERT(cluster_idx == thread); // each thread has one task
const BoundedTopology::Cluster& cluster =
topology.GetCluster(pkg_idx, cluster_idx);
clusters_[cluster_idx] =
MakePool(CapIfNonZero(cluster.Size(), max_workers_per_cluster));
MakePool(CapIfNonZero(cluster.Size(), max_workers_per_cluster),
cluster.Node());
// Pin workers AND the calling thread from `all_clusters`.
GetPinning().MaybePin(pkg_idx, cluster_idx, cluster,
clusters_[cluster_idx]);
clusters_[cluster_idx][0]);
});
}

View File

@ -19,14 +19,14 @@
#include <stddef.h>
#include <stdint.h>
#include <memory> // std::unique_ptr
#include <vector>
// IWYU pragma: begin_exports
#include "util/allocator.h"
#include "util/basics.h" // Tristate
#include "hwy/base.h" // HWY_ASSERT
#include "util/topology.h"
#include "hwy/base.h" // HWY_ASSERT
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/contrib/thread_pool/topology.h"
// IWYU pragma: end_exports
#ifndef GEMMA_DISABLE_TOPOLOGY
@ -35,157 +35,9 @@
namespace gcpp {
static inline size_t SaturatingSub(size_t a, size_t b) {
return a - HWY_MIN(a, b);
}
// `max_or_zero` == 0 means no limit.
static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) {
return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero);
}
// A slice of a 1D integer range such as the indices of packages or clusters.
// This allows assigning them to multiple instances of our binary.
class BoundedSlice {
public:
// Defaults to "use all detected".
BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {}
size_t Begin() const { return skip_; }
// STL-style one past the end.
size_t End(size_t detected) const {
return (max_ == 0) ? detected : HWY_MIN(detected, skip_ + max_);
}
// Number of elements in the slice.
size_t Num(size_t detected) const { return End(detected) - Begin(); }
bool Contains(size_t detected, size_t idx) const {
return Begin() <= idx && idx < End(detected);
}
template <class Func>
void Foreach(const char* name, size_t detected, const Func& func) {
if (Begin() >= detected) {
HWY_ABORT("Invalid skip=%zu for %s, detected=%zu", skip_, name, detected);
}
for (size_t i = Begin(); i < End(detected); ++i) {
func(i);
}
}
private:
// How many to skip, or equivalently, index of the first to use. It is an
// error if this is >= `detected`, because that would leave none for this
// instance to use.
size_t skip_;
// Upper bound on the number to use, or zero if no limit.
size_t max_;
};
// "LP" is a logical processor, a 0-based index passed to the OS.
using LPS = hwy::LogicalProcessorSet;
// We want vectors of hwy::ThreadPool, which is unfortunately not movable,
// hence we wrap them in unique_ptr.
using PoolPtr = std::unique_ptr<hwy::ThreadPool>;
// Wraps hwy::Topology and only keeps the subset of packages and clusters
// apportioned by BoundedSlice, further limited by the OS affinity mask.
// NOTE: if topology is unknown or the OS affinity is too restrictive, we fall
// back to a single package and cluster.
class BoundedTopology {
public:
// Thread-hostile, typically called from main thread.
BoundedTopology(BoundedSlice package_slice, BoundedSlice cluster_slice,
BoundedSlice lp_slice);
size_t NumPackages() const { return packages_.size(); }
size_t NumNodes() const { return nodes_.Count(); }
const char* TopologyString() const { return topology_string_; }
class Cluster {
public:
Cluster(const LPS& lps);
Cluster(const LPS& enabled_lps,
const std::vector<hwy::Topology::LP>& all_lps,
const hwy::Topology::Cluster& tcluster);
// For SortByDescendingSize.
size_t Size() const { return num_workers_; }
// Returns vector with all enabled LPs, used for pinning.
std::vector<size_t> LPVector() const {
std::vector<size_t> lps;
lps.reserve(lps_.Count());
lps_.Foreach([&lps](size_t lp) { lps.push_back(lp); });
return lps;
}
const LPS& LPSet() const { return lps_; }
size_t Node() const { return node_; }
size_t PrivateKiB() const { return private_kib_; }
size_t SharedKiB() const { return shared_kib_; }
private:
// Enabled LPs; if topology is known, only the ones in this cluster.
LPS lps_;
// How many workers in the per-cluster pool. If 0, this Cluster is removed.
size_t num_workers_ = 0;
// NUMA node, set from hwy::Topology::LP::node.
size_t node_ = 0;
// L2 cache size in KiB, or 0 if unknown.
size_t private_kib_ = 0;
// L3 cache size in KiB, or 0 if unknown.
size_t shared_kib_ = 0;
}; // Cluster
size_t NumClusters(size_t pkg_idx) const {
HWY_ASSERT(pkg_idx < NumPackages());
return packages_[pkg_idx].clusters.size();
}
const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const {
HWY_ASSERT(pkg_idx < NumPackages());
const Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) {
HWY_ASSERT(pkg_idx < NumPackages());
Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
#if !GEMMA_DISABLE_TOPOLOGY
const hwy::Topology& FullTopology() const { return topology_; }
#endif
private:
struct Package {
explicit Package(const LPS& enabled_lps);
Package(const LPS& enabled_lps, const hwy::Topology& topology,
size_t pkg_idx, BoundedSlice cluster_slice);
// For SortByDescendingSize.
size_t Size() const { return clusters.size(); }
std::vector<Cluster> clusters;
}; // Package
void InitFromTopology(const LPS& enabled_lps, BoundedSlice package_slice,
BoundedSlice cluster_slice);
void InitFromLPs(const LPS& enabled_lps);
#if !GEMMA_DISABLE_TOPOLOGY
hwy::Topology topology_;
#endif
std::vector<Package> packages_;
char topology_string_[96];
LPS nodes_;
};
// Page-aligned on NUMA systems so we can bind to a NUMA node. This also allows
// moving because it is a typedef to `std::unique_ptr`.
using PoolPtr = AlignedClassPtr<hwy::ThreadPool>;
// Creates a hierarchy of thread pools according to `BoundedTopology`: one with
// a thread per enabled package; for each of those, one with a thread per
@ -215,16 +67,16 @@ class NestedPools {
// `max_threads` is the maximum number of threads to divide among all
// clusters. This is more intuitive than a per-cluster limit for users who
// may not be aware of the CPU topology.
// may not be aware of the CPU topology. 0 means no limit.
//
// To ensure we do not create more threads than there are HW cores, which
// would cause huge slowdowns when spinning, the `BoundedSlice` arguments
// only impose upper bounds on the number of detected packages and clusters
// rather than defining the actual number of threads.
NestedPools(size_t max_threads, Tristate pin = Tristate::kDefault,
BoundedSlice package_slice = BoundedSlice(),
BoundedSlice cluster_slice = BoundedSlice(),
BoundedSlice lp_slice = BoundedSlice());
//
// Caller must have called `Allocator::Init` before this.
NestedPools(const BoundedTopology& topology, size_t max_threads = 0,
Tristate pin = Tristate::kDefault);
bool AllPinned() const { return all_pinned_; }
@ -251,7 +103,7 @@ class NestedPools {
}
size_t NumPackages() const { return packages_.size(); }
hwy::ThreadPool& AllPackages() { return *all_packages_; }
hwy::ThreadPool& AllPackages() { return all_packages_[0]; }
hwy::ThreadPool& AllClusters(size_t pkg_idx) {
HWY_DASSERT(pkg_idx < NumPackages());
return packages_[pkg_idx].AllClusters();
@ -261,11 +113,6 @@ class NestedPools {
return packages_[pkg_idx].Cluster(cluster_idx);
}
// For binding to NUMA nodes.
size_t Node(size_t pkg_idx, size_t cluster_idx) const {
return topology_.GetCluster(pkg_idx, cluster_idx).Node();
}
// Reasonably tight upper bounds for allocating thread-local storage (TLS).
size_t MaxWorkersPerCluster() const { return max_workers_per_cluster_; }
size_t MaxWorkersPerPackage() const {
@ -282,10 +129,7 @@ class NestedPools {
return total_workers;
}
// For Allocator
const BoundedTopology& Topology() const { return topology_; }
// For ShowConfig
const char* TopologyString() const { return topology_.TopologyString(); }
const char* PinString() const { return pin_string_; }
// Returns a single pool on the given package: either one thread per cluster
@ -313,28 +157,28 @@ class NestedPools {
size_t max_workers_per_cluster = 0;
for (const PoolPtr& cluster : clusters_) {
max_workers_per_cluster =
HWY_MAX(max_workers_per_cluster, cluster->NumWorkers());
HWY_MAX(max_workers_per_cluster, cluster[0].NumWorkers());
}
return max_workers_per_cluster;
}
size_t TotalWorkers() const {
size_t total_workers = 0;
for (const PoolPtr& cluster : clusters_) {
total_workers += cluster->NumWorkers();
total_workers += cluster[0].NumWorkers();
}
return total_workers;
}
hwy::ThreadPool& AllClusters() { return *all_clusters_; }
hwy::ThreadPool& AllClusters() { return all_clusters_[0]; }
hwy::ThreadPool& Cluster(size_t cluster_idx) {
HWY_DASSERT(cluster_idx < clusters_.size());
return *clusters_[cluster_idx];
return clusters_[cluster_idx][0];
}
void SetWaitMode(hwy::PoolWaitMode wait_mode) {
all_clusters_->SetWaitMode(wait_mode);
all_clusters_[0].SetWaitMode(wait_mode);
for (PoolPtr& cluster : clusters_) {
cluster->SetWaitMode(wait_mode);
cluster[0].SetWaitMode(wait_mode);
}
}
@ -344,13 +188,12 @@ class NestedPools {
}; // Package
void SetWaitMode(hwy::PoolWaitMode wait_mode) {
all_packages_->SetWaitMode(wait_mode);
all_packages_[0].SetWaitMode(wait_mode);
for (Package& package : packages_) {
package.SetWaitMode(wait_mode);
}
}
BoundedTopology topology_;
bool all_pinned_;
const char* pin_string_;

View File

@ -22,10 +22,17 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "util/allocator.h"
#include "util/basics.h"
#include "hwy/aligned_allocator.h"
#include "hwy/auto_tune.h"
#include "hwy/base.h" // HWY_ASSERT
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/nanobenchmark.h"
#include "hwy/nanobenchmark.h" // Unpredictable1
#include "hwy/robust_statistics.h"
#include "hwy/stats.h"
#include "hwy/tests/test_util.h" // AdjustedReps
#include "hwy/timer.h"
namespace gcpp {
namespace {
@ -253,80 +260,150 @@ TEST(ThreadingTest, TestParallelizeTwoRanges) {
}
}
// Governs duration of test; avoid timeout in debug builds.
#if HWY_IS_DEBUG_BUILD
constexpr size_t kMaxEvals = 2;
#else
constexpr size_t kMaxEvals = 8;
#endif
static constexpr size_t kU64PerThread = HWY_ALIGNMENT / sizeof(size_t);
static uint64_t outputs[hwy::kMaxLogicalProcessors * kU64PerThread];
hwy::FuncOutput ForkJoin(const void* opaque, hwy::FuncInput in) {
hwy::ThreadPool& pool =
*reinterpret_cast<hwy::ThreadPool*>(const_cast<void*>(opaque));
pool.Run(0, in, [&](uint64_t task, size_t thread) {
outputs[thread * kU64PerThread] = in;
});
return in;
std::vector<uint64_t> MeasureForkJoin(hwy::ThreadPool& pool) {
// Governs duration of test; avoid timeout in debug builds.
const size_t max_reps = hwy::AdjustedReps(50 * 1000);
std::vector<uint64_t> times;
times.reserve(max_reps);
const size_t base = hwy::Unpredictable1();
const double t0 = hwy::platform::Now();
for (size_t reps = 0; reps < 1200; ++reps) {
pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) {
outputs[thread * kU64PerThread] = base + thread;
});
hwy::PreventElision(outputs[base]);
if (pool.AutoTuneComplete()) break;
}
const double t1 = hwy::platform::Now();
// TODO(janwas): enable after Highway update
#if 0
if (pool.AutoTuneComplete()) {
hwy::Span<hwy::CostDistribution> cd = pool.AutoTuneCosts();
std::vector<double> costs;
costs.reserve(cd.size());
double min_cost = hwy::HighestValue<double>();
for (size_t i = 0; i < cd.size(); ++i) {
costs.push_back(cd[i].EstimateCost());
min_cost = HWY_MIN(min_cost, costs.back());
}
// Harmonic mean = reciprocal of the arithmetic mean of the reciprocals.
double sum_recip_speedup = 0.0;
size_t count_sum = 0;
for (size_t i = 0; i < costs.size(); ++i) {
if (costs[i] == min_cost) continue;
const double speedup = costs[i] / min_cost;
HWY_ASSERT(speedup > 1.0);
sum_recip_speedup += 1.0 / speedup;
++count_sum;
}
const double harm_mean_speedup =
static_cast<double>(count_sum / sum_recip_speedup);
fprintf(stderr, "\nAuto-tuning took %f ms; harm. mean speedup: %f.\n",
(t1 - t0) * 1E3, harm_mean_speedup);
} else {
HWY_WARN("Auto-tuning did not complete yet.");
}
#else
(void)t0;
(void)t1;
#endif
char cpu100[100];
static const bool have_stop = hwy::platform::HaveTimerStop(cpu100);
if (have_stop) {
for (size_t rep = 0; rep < max_reps; ++rep) {
const uint64_t t0 = hwy::timer::Start();
pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) {
outputs[thread * kU64PerThread] = base + thread;
});
const uint64_t t1 = hwy::timer::Stop();
times.push_back(t1 - t0);
}
} else {
for (size_t rep = 0; rep < max_reps; ++rep) {
const uint64_t t0 = hwy::timer::Start();
pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) {
outputs[thread * kU64PerThread] = base + thread;
});
const uint64_t t1 = hwy::timer::Start();
times.push_back(t1 - t0);
}
}
return times;
}
TEST(ThreadingTest, BenchJoin) {
constexpr size_t kInputs = 1;
static hwy::FuncInput inputs[kInputs];
const auto measure = [&](hwy::ThreadPool& pool, bool spin,
const char* caption) {
inputs[0] =
static_cast<hwy::FuncInput>(hwy::Unpredictable1() * pool.NumWorkers());
hwy::Result results[kInputs];
hwy::Params params;
params.verbose = false;
params.max_evals = kMaxEvals;
// Only spin for the duration of the benchmark to avoid wasting energy and
// interfering with the other pools.
if (spin) {
pool.SetWaitMode(hwy::PoolWaitMode::kSpin);
}
const size_t num_results =
Measure(&ForkJoin, reinterpret_cast<const uint8_t*>(&pool), inputs,
kInputs, results, params);
std::vector<uint64_t> times = MeasureForkJoin(pool);
// Capture before SetWaitMode changes it.
const hwy::pool::Config config = pool.config();
if (spin) {
pool.SetWaitMode(hwy::PoolWaitMode::kBlock);
}
for (size_t i = 0; i < num_results; ++i) {
printf("%-20s: %5d: %6.2f us; MAD=%4.2f%%\n", caption,
static_cast<int>(results[i].input),
results[i].ticks / hwy::platform::InvariantTicksPerSecond() * 1E6,
results[i].variability * 100.0);
const uint64_t median =
hwy::robust_statistics::Median(times.data(), times.size());
const uint64_t mode =
hwy::robust_statistics::ModeOfSorted(times.data(), times.size());
// Trim upper half and lower quarter to reduce outliers before Stats.
const size_t quarter = times.size() / 4;
times.erase(times.begin() + 2 * quarter, times.end());
times.erase(times.begin(), times.begin() + quarter);
hwy::Stats stats;
for (uint64_t t : times) {
stats.Notify(static_cast<float>(t));
}
fprintf(stderr, "%-20s: %3d: %6.2f %6.2f us %s\n", caption,
static_cast<int>(pool.NumWorkers()),
median / hwy::platform::InvariantTicksPerSecond() * 1E6,
mode / hwy::platform::InvariantTicksPerSecond() * 1E6,
config.ToString().c_str());
fprintf(stderr, "%s\n", stats.ToString().c_str());
// Verify outputs to ensure the measured code is not a no-op.
for (size_t lp = 0; lp < pool.NumWorkers(); ++lp) {
HWY_ASSERT(outputs[lp * kU64PerThread] == pool.NumWorkers());
HWY_ASSERT(outputs[lp * kU64PerThread] >= 1);
HWY_ASSERT(outputs[lp * kU64PerThread] <= 1 + pool.NumWorkers());
for (size_t i = 1; i < kU64PerThread; ++i) {
HWY_ASSERT(outputs[lp * kU64PerThread + i] == 0);
}
}
};
NestedPools pools(0);
BoundedTopology topology;
Allocator::Init(topology, true);
NestedPools pools(topology);
// Use last package because the main thread has been pinned to it.
const size_t pkg_idx = pools.NumPackages() - 1;
measure(pools.AllPackages(), false, "block packages");
if (pools.AllClusters(0).NumWorkers() > 1) {
measure(pools.AllClusters(0), false, "block clusters");
if (pools.AllClusters(pkg_idx).NumWorkers() > 1) {
measure(pools.AllClusters(pkg_idx), false, "block clusters");
}
measure(pools.Cluster(0, 0), false, "block in_cluster");
measure(pools.Cluster(pkg_idx, 0), false, "block in_cluster");
if (pools.AllPinned()) {
const bool kSpin = true;
measure(pools.AllPackages(), kSpin, "spin packages");
if (pools.AllClusters(0).NumWorkers() > 1) {
measure(pools.AllClusters(0), kSpin, "spin clusters");
if (pools.AllClusters(pkg_idx).NumWorkers() > 1) {
measure(pools.AllClusters(pkg_idx), kSpin, "spin clusters");
}
measure(pools.Cluster(0, 0), kSpin, "spin in_cluster");
measure(pools.Cluster(pkg_idx, 0), kSpin, "spin in_cluster");
}
}

336
util/topology.cc Normal file
View File

@ -0,0 +1,336 @@
// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "util/topology.h"
#include <stdio.h>
#include <algorithm> // std::sort
#include <utility> // std::move
#include <vector>
#include "hwy/base.h"
namespace gcpp {
// Sort T := packages/clusters by descending 'size' so that users who only use
// one Group get the largest.
template <class T>
static void SortByDescendingSize(std::vector<T>& groups) {
std::sort(groups.begin(), groups.end(),
[](const T& a, const T& b) { return a.Size() > b.Size(); });
}
// Returns set of LPs available for use.
static LPS EnabledLPs(const BoundedSlice& lp_slice) {
LPS enabled_lps;
// Thread-safe caching during the first call because subsequent pinning
// overwrites the main thread's affinity.
static const LPS affinity = []() {
LPS affinity;
if (!GetThreadAffinity(affinity)) affinity = LPS();
return affinity;
}();
if (HWY_LIKELY(affinity.Any())) {
// To honor taskset/numactl *and* the users's `lp_slice`, we interpret
// the latter as a slice of the 1-bits of `enabled_lps`. Note that this
// can be used to exclude hyperthreads because Linux groups LPs by
// sibling index. For example, the first `num_cores` are not siblings.
const size_t detected = affinity.Count();
size_t enabled_idx = 0;
affinity.Foreach([&](size_t lp) {
if (lp_slice.Contains(detected, enabled_idx)) {
enabled_lps.Set(lp);
}
++enabled_idx;
});
} else {
const size_t num_lps = hwy::TotalLogicalProcessors();
HWY_WARN("unknown OS affinity, max %zu LPs and slice %zu.", num_lps,
lp_slice.Num(num_lps));
for (size_t lp = 0; lp < num_lps; ++lp) {
if (lp_slice.Contains(num_lps, lp)) {
enabled_lps.Set(lp);
}
}
}
// Without threading support, only keep the first enabled LP; it might still
// make sense to pin the main thread to avoid migrations.
if (HWY_UNLIKELY(!hwy::HaveThreadingSupport())) {
HWY_ASSERT(enabled_lps.Any());
const size_t lp = enabled_lps.First();
enabled_lps = LPS();
enabled_lps.Set(lp);
HWY_WARN("Warning, threads not supported, using only the main thread.");
}
return enabled_lps;
}
BoundedTopology::BoundedTopology(BoundedSlice package_slice,
BoundedSlice cluster_slice,
BoundedSlice lp_slice) {
const LPS enabled_lps = EnabledLPs(lp_slice);
#if !GEMMA_DISABLE_TOPOLOGY
if (HWY_LIKELY(!topology_.packages.empty())) {
InitFromTopology(enabled_lps, package_slice, cluster_slice);
}
#endif
// Topology unknown or no packages with enabled LPs: create a single
// package with one cluster, and one node.
if (HWY_UNLIKELY(NumPackages() == 0)) {
InitFromLPs(enabled_lps);
}
HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0);
}
// Topology is unknown, take the given set of LPs.
BoundedTopology::Cluster::Cluster(const LPS& lps) {
lps_ = lps;
num_workers_ = lps.Count();
}
BoundedTopology::Cluster::Cluster(const LPS& enabled_lps,
const std::vector<hwy::Topology::LP>& all_lps,
const hwy::Topology::Cluster& tcluster) {
bool is_first_lp = true;
tcluster.lps.Foreach([&](size_t lp) {
// Skip if not first-hyperthread or disabled.
if (all_lps[lp].smt != 0 || !enabled_lps.Get(lp)) return;
HWY_ASSERT(!lps_.Get(lp)); // Foreach ensures uniqueness
lps_.Set(lp);
++num_workers_;
// Set fields once, and ensure subsequent LPs match - we assume there
// is only one NUMA node per cluster, with the same L2/L3 size.
const size_t lp_node = static_cast<size_t>(all_lps[lp].node);
if (is_first_lp) {
is_first_lp = false;
node_ = lp_node;
private_kib_ = tcluster.private_kib;
shared_kib_ = tcluster.shared_kib;
} else {
static bool warned = false;
if (HWY_LIKELY(!warned)) {
if (HWY_UNLIKELY(lp_node != node_)) {
warned = true;
HWY_WARN("lp %zu on node %zu != cluster node %zu.", lp, lp_node,
node_);
}
if (HWY_UNLIKELY(private_kib_ != tcluster.private_kib)) {
warned = true;
HWY_WARN("lp %zu private_kib %zu != cluster %zu.", lp, private_kib_,
tcluster.private_kib);
}
if (HWY_UNLIKELY(shared_kib_ != tcluster.shared_kib)) {
warned = true;
HWY_WARN("lp %zu shared_kib %zu != cluster %zu.", lp, shared_kib_,
tcluster.shared_kib);
}
} // !warned
}
});
}
// CPUs without clusters are rarely more than dozens of cores, and 6 is a
// decent number of threads in a per-cluster pool.
constexpr bool kSplitLargeClusters = false;
constexpr size_t kMaxClusters = 8;
constexpr size_t kMaxLPsPerCluster = 6;
// Topology is unknown, use only the given LPs which derive from OS affinity
// and `lp_slice`.
BoundedTopology::Package::Package(const LPS& enabled_lps) {
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
kSplitLargeClusters
? HWY_MIN(kMaxClusters,
hwy::DivCeil(enabled_lps.Count(), kMaxLPsPerCluster))
: 1;
size_t enabled_idx = 0;
enabled_lps.Foreach([&](size_t lp) {
clusters_lps[enabled_idx % num_clusters].Set(lp);
++enabled_idx;
});
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
clusters.push_back(Cluster(clusters_lps[cluster_idx]));
}
}
// NOTE: caller is responsible for checking whether `clusters` is empty.
BoundedTopology::Package::Package(const LPS& enabled_lps,
const hwy::Topology& topology, size_t pkg_idx,
BoundedSlice cluster_slice) {
const hwy::Topology::Package& tpackage = topology.packages[pkg_idx];
// Populate `clusters` with the subset of clusters in `cluster_slice` that
// have any enabled LPs. If `clusters` remains empty, the caller will
// skip this `Package`.
clusters.reserve(cluster_slice.Num(tpackage.clusters.size()));
cluster_slice.Foreach(
"cluster", tpackage.clusters.size(), [&](size_t cluster_idx) {
const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx];
Cluster cluster(enabled_lps, topology.lps, tcluster);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(cluster.Size() != 0)) {
clusters.push_back(cluster);
}
});
SortByDescendingSize(clusters);
// If there is only one large cluster, split it into smaller ones.
if (kSplitLargeClusters && clusters.size() == 1 &&
enabled_lps.Count() >= 16) {
const LPS lps = clusters[0].LPSet(); // copy so we can clear
clusters.clear();
// Split `lps` into several clusters.
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster));
size_t num_lps = 0;
lps.Foreach(
[&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); });
HWY_DASSERT(num_lps == lps.Count());
// Create new clusters, just inserting the new LPS.
hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
tcluster.lps = clusters_lps[cluster_idx];
// Keep same `private_kib` and `shared_kib`.
clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster));
}
}
}
#if !GEMMA_DISABLE_TOPOLOGY
static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) {
LPS cores;
lps.Foreach([&](size_t lp) {
if (topology.lps[lp].smt == 0) cores.Set(lp);
});
return cores.Count();
}
// Scans hwy::Topology for clusters and their size, for use by topology_string_.
static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters,
size_t& max_tcluster_cores,
size_t& max_tcluster_lps) {
max_tclusters = 0;
max_tcluster_cores = 0;
max_tcluster_lps = 0;
for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) {
const std::vector<hwy::Topology::Cluster>& tclusters =
topology_.packages[pkg_idx].clusters;
max_tclusters = HWY_MAX(max_tclusters, tclusters.size());
size_t tcluster_cores = 0;
size_t tcluster_lps = 0;
for (size_t cluster_idx = 0; cluster_idx < tclusters.size();
++cluster_idx) {
const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_);
const size_t lps = tclusters[cluster_idx].lps.Count();
tcluster_cores = HWY_MAX(tcluster_cores, cores);
tcluster_lps = HWY_MAX(tcluster_lps, lps);
}
if (tclusters.size() > 1 && tcluster_cores > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in hwy::Topology.",
pkg_idx, tcluster_cores);
}
max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores);
max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps);
}
HWY_ASSERT(max_tclusters != 0);
HWY_ASSERT(max_tcluster_cores != 0);
HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores);
}
// Main part of ctor, called when topology is known.
void BoundedTopology::InitFromTopology(const LPS& enabled_lps,
BoundedSlice package_slice,
BoundedSlice cluster_slice) {
size_t max_tclusters, max_tcluster_cores, max_tcluster_lps;
ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps);
// (Possibly empty) subset of `Topology` packages that have `enabled_lps`.
package_slice.Foreach(
"package", topology_.packages.size(), [&](size_t pkg_idx) {
Package package(enabled_lps, topology_, pkg_idx, cluster_slice);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(!package.clusters.empty())) {
packages_.push_back(std::move(package));
}
});
if (NumPackages() == 0) return;
SortByDescendingSize(packages_);
// Remember NUMA nodes that we are actually using (not just enabled).
for (const Package& p : packages_) {
for (const Cluster& c : p.clusters) {
nodes_.Set(c.Node());
}
}
// Scan for max BoundedTopology clusters and their size, for topology_string_.
size_t all_max_cluster_size = 0;
for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) {
size_t max_cluster_size = 0;
for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx);
++cluster_idx) {
max_cluster_size =
HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size());
}
if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in BoundedTopology.",
pkg_idx, max_cluster_size);
}
all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size);
}
snprintf(topology_string_, sizeof(topology_string_),
"%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)",
topology_.packages.size(), max_tclusters, max_tcluster_cores,
max_tcluster_lps / max_tcluster_cores, packages_.size(),
NumClusters(0), all_max_cluster_size, nodes_.Count());
}
#endif // !GEMMA_DISABLE_TOPOLOGY
void BoundedTopology::InitFromLPs(const LPS& enabled_lps) {
packages_.push_back(Package(enabled_lps));
snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu",
GetCluster(0, 0).Size());
// Assume a single NUMA node.
nodes_.Set(0);
HWY_ASSERT(NumNodes() == 1);
}
} // namespace gcpp

177
util/topology.h Normal file
View File

@ -0,0 +1,177 @@
// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_
#define THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_
#include <stddef.h>
#include <stdint.h>
#include <vector>
// IWYU pragma: begin_exports
#include "hwy/base.h" // HWY_ASSERT
#include "hwy/contrib/thread_pool/topology.h"
// IWYU pragma: end_exports
#ifndef GEMMA_DISABLE_TOPOLOGY
#define GEMMA_DISABLE_TOPOLOGY 0
#endif // !GEMMA_DISABLE_TOPOLOGY
namespace gcpp {
// A slice of a 1D integer range such as the indices of packages or clusters.
// This allows assigning them to multiple instances of our binary.
class BoundedSlice {
public:
// Defaults to "use all detected".
BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {}
size_t Begin() const { return skip_; }
// STL-style one past the end.
size_t End(size_t detected) const {
return (max_ == 0) ? detected : HWY_MIN(detected, skip_ + max_);
}
// Number of elements in the slice.
size_t Num(size_t detected) const { return End(detected) - Begin(); }
bool Contains(size_t detected, size_t idx) const {
return Begin() <= idx && idx < End(detected);
}
template <class Func>
void Foreach(const char* name, size_t detected, const Func& func) {
if (Begin() >= detected) {
HWY_ABORT("Invalid skip=%zu for %s, detected=%zu", skip_, name, detected);
}
for (size_t i = Begin(); i < End(detected); ++i) {
func(i);
}
}
private:
// How many to skip, or equivalently, index of the first to use. It is an
// error if this is >= `detected`, because that would leave none for this
// instance to use.
size_t skip_;
// Upper bound on the number to use, or zero if no limit.
size_t max_;
};
// "LP" is a logical processor, a 0-based index passed to the OS.
using LPS = hwy::LogicalProcessorSet;
// Wraps hwy::Topology and only keeps the subset of packages and clusters
// apportioned by BoundedSlice, further limited by the OS affinity mask.
// NOTE: if topology is unknown or the OS affinity is too restrictive, we fall
// back to a single package and cluster.
class BoundedTopology {
public:
// Defaults to "use all detected".
BoundedTopology(BoundedSlice package_slice = BoundedSlice(),
BoundedSlice cluster_slice = BoundedSlice(),
BoundedSlice lp_slice = BoundedSlice());
size_t NumPackages() const { return packages_.size(); }
size_t NumNodes() const { return nodes_.Count(); }
const char* TopologyString() const { return topology_string_; }
class Cluster {
public:
Cluster(const LPS& lps);
Cluster(const LPS& enabled_lps,
const std::vector<hwy::Topology::LP>& all_lps,
const hwy::Topology::Cluster& tcluster);
// For SortByDescendingSize.
size_t Size() const { return num_workers_; }
// Returns vector with all enabled LPs, used for pinning.
std::vector<size_t> LPVector() const {
std::vector<size_t> lps;
lps.reserve(lps_.Count());
lps_.Foreach([&lps](size_t lp) { lps.push_back(lp); });
return lps;
}
const LPS& LPSet() const { return lps_; }
size_t Node() const { return node_; }
size_t PrivateKiB() const { return private_kib_; }
size_t SharedKiB() const { return shared_kib_; }
private:
// Enabled LPs; if topology is known, only the ones in this cluster.
LPS lps_;
// How many workers in the per-cluster pool. If 0, this Cluster is removed.
size_t num_workers_ = 0;
// NUMA node, set from hwy::Topology::LP::node.
size_t node_ = 0;
// L2 cache size in KiB, or 0 if unknown.
size_t private_kib_ = 0;
// L3 cache size in KiB, or 0 if unknown.
size_t shared_kib_ = 0;
}; // Cluster
size_t NumClusters(size_t pkg_idx) const {
HWY_ASSERT(pkg_idx < NumPackages());
return packages_[pkg_idx].clusters.size();
}
const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const {
HWY_ASSERT(pkg_idx < NumPackages());
const Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) {
HWY_ASSERT(pkg_idx < NumPackages());
Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
#if !GEMMA_DISABLE_TOPOLOGY
const hwy::Topology& FullTopology() const { return topology_; }
#endif
private:
struct Package {
explicit Package(const LPS& enabled_lps);
Package(const LPS& enabled_lps, const hwy::Topology& topology,
size_t pkg_idx, BoundedSlice cluster_slice);
// For SortByDescendingSize.
size_t Size() const { return clusters.size(); }
std::vector<Cluster> clusters;
}; // Package
void InitFromTopology(const LPS& enabled_lps, BoundedSlice package_slice,
BoundedSlice cluster_slice);
void InitFromLPs(const LPS& enabled_lps);
#if !GEMMA_DISABLE_TOPOLOGY
hwy::Topology topology_;
#endif
std::vector<Package> packages_;
char topology_string_[96];
LPS nodes_;
};
} // namespace gcpp
#endif // THIRD_PARTY_GEMMA_CPP_UTIL_TOPOLOGY_H_