From 9e213b3d96896053224a259f7ebadf734e57267b Mon Sep 17 00:00:00 2001 From: Zelalem Aweke Date: Tue, 4 Jun 2024 07:49:53 -0700 Subject: [PATCH] Use system topology to pin threads across clusters. PiperOrigin-RevId: 640151974 --- BUILD.bazel | 2 ++ debug_prompt.cc | 8 ++---- gemma/benchmark.cc | 8 ++---- gemma/run.cc | 7 ++--- gemma/run_mmlu.cc | 7 ++--- util/app.h | 71 ++++++++++++++++++++++++++++++++++++---------- 6 files changed, 66 insertions(+), 37 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 96f9c12..621233f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -96,6 +96,8 @@ cc_library( ":gemma_lib", "//compression:io", "@hwy//:hwy", + "@hwy//:thread_pool", + "@hwy//:topology", ], ) diff --git a/debug_prompt.cc b/debug_prompt.cc index 8159f83..8954f29 100644 --- a/debug_prompt.cc +++ b/debug_prompt.cc @@ -109,13 +109,9 @@ int main(int argc, char** argv) { log_layers_output ? &json_logger.layers_output_log_f : nullptr; hwy::ThreadPool pool(app.num_threads); - // For many-core, pinning threads to cores helps. + // For many-core, pinning workers to cores helps. if (app.num_threads > 10) { - gcpp::PinThreadToCore(app.num_threads - 1); // Main thread - - pool.Run(0, pool.NumThreads(), [](uint64_t /*task*/, size_t thread) { - gcpp::PinThreadToCore(thread); - }); + gcpp::PinWorkersToCores(pool); } gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool); diff --git a/gemma/benchmark.cc b/gemma/benchmark.cc index ca06434..2233de9 100644 --- a/gemma/benchmark.cc +++ b/gemma/benchmark.cc @@ -271,13 +271,9 @@ int main(int argc, char** argv) { } hwy::ThreadPool pool(app.num_threads); - // For many-core, pinning threads to cores helps. + // For many-core, pinning workers to cores helps. if (app.num_threads > 10) { - gcpp::PinThreadToCore(app.num_threads - 1); // Main thread - - pool.Run(0, pool.NumThreads(), [](uint64_t /*task*/, size_t thread) { - gcpp::PinThreadToCore(thread); - }); + gcpp::PinWorkersToCores(pool); } gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool); diff --git a/gemma/run.cc b/gemma/run.cc index bb4f199..2d557af 100644 --- a/gemma/run.cc +++ b/gemma/run.cc @@ -244,12 +244,9 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) { PROFILER_ZONE("Run.misc"); hwy::ThreadPool pool(app.num_threads); - // For many-core, pinning threads to cores helps. + // For many-core, pinning workers to cores helps. if (app.num_threads > 10) { - PinThreadToCore(app.num_threads - 1); // Main thread - - pool.Run(0, pool.NumThreads(), - [](uint64_t /*task*/, size_t thread) { PinThreadToCore(thread); }); + PinWorkersToCores(pool); } gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool); diff --git a/gemma/run_mmlu.cc b/gemma/run_mmlu.cc index a733318..4d1b0be 100644 --- a/gemma/run_mmlu.cc +++ b/gemma/run_mmlu.cc @@ -154,12 +154,9 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) { PROFILER_ZONE("Run.misc"); hwy::ThreadPool pool(app.num_threads); - // For many-core, pinning threads to cores helps. + // For many-core, pinning workers to cores helps. if (app.num_threads > 10) { - PinThreadToCore(app.num_threads - 1); // Main thread - - pool.Run(0, pool.NumThreads(), - [](uint64_t /*task*/, size_t thread) { PinThreadToCore(thread); }); + PinWorkersToCores(pool); } gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool); diff --git a/util/app.h b/util/app.h index 6f789e6..190c9a3 100644 --- a/util/app.h +++ b/util/app.h @@ -18,6 +18,7 @@ #ifndef THIRD_PARTY_GEMMA_CPP_UTIL_APP_H_ #define THIRD_PARTY_GEMMA_CPP_UTIL_APP_H_ +#include "hwy/contrib/thread_pool/thread_pool.h" #if HWY_OS_LINUX #include @@ -30,12 +31,14 @@ #include // std::clamp #include // NOLINT> +#include #include "compression/io.h" // Path #include "gemma/configs.h" #include "gemma/gemma.h" #include "util/args.h" #include "hwy/base.h" // HWY_ASSERT +#include "hwy/contrib/thread_pool/topology.h" namespace gcpp { @@ -61,22 +64,60 @@ static inline const char* CompiledConfig() { } } -static inline void PinThreadToCore(size_t cpu_index) { -#if HWY_OS_LINUX - // Forces the thread to run on the logical processor with the same number. - cpu_set_t cset; // bit array - CPU_ZERO(&cset); // clear all - CPU_SET(cpu_index, &cset); // set bit indicating which processor to run on. - const int err = sched_setaffinity(0, sizeof(cset), &cset); - if (err != 0) { - fprintf(stderr, - "sched_setaffinity returned %d, errno %d. Can happen if running in " - "a container; this warning is safe to ignore.\n", - err, errno); +static inline std::vector LpsToCpus( + const hwy::LogicalProcessorSet& lps) { + std::vector cpus; + cpus.reserve(lps.Count()); + lps.Foreach([&cpus](size_t lp) { cpus.push_back(lp); }); + return cpus; +} + +static inline std::vector AssignCpusFromTopology( + const hwy::Topology& topology, const size_t num_workers) { + // Assign CPUs to workers 0 to num_workers - 1 based on the topology. + // The assignments are done in a round-robin fashion across all clusters and + // Cores. + // For example, if we have 4 clusters, the assignments will be: + // Thread 0 -> Cluster 0, Core 0 + // Thread 1 -> Cluster 1, Core 0 + // Thread 2 -> Cluster 2, Core 0 + // Thread 3 -> Cluster 3, Core 0 + // Thread 4 -> Cluster 0, Core 1 + // Thread 5 -> Cluster 1, Core 1 + // ... and so on. + // + // This would result in the least amount of sharing of the last-level + // cache slices. All assignments are made from Package 0. + std::vector> clusters; + clusters.reserve(topology.packages[0].clusters.size()); + for (auto& cluster : topology.packages[0].clusters) { + clusters.push_back(LpsToCpus(cluster.lps)); + } + std::vector assigned_cpus; + assigned_cpus.reserve(num_workers); + for (size_t i = 0; i < num_workers; ++i) { + size_t cluster_index = i % clusters.size(); + size_t cpu_index = (i / clusters.size()) % clusters[cluster_index].size(); + assigned_cpus.push_back(clusters[cluster_index][cpu_index]); + } + return assigned_cpus; +} + +static inline void PinWorkersToCores(hwy::ThreadPool& pool) { + // Use topology to pin workers to cores if available. + hwy::Topology topology; + if (!topology.packages.empty()) { + std::vector assigned_cpus = + AssignCpusFromTopology(topology, pool.NumWorkers()); + pool.Run(0, pool.NumWorkers(), + [&assigned_cpus](uint64_t /*task*/, size_t thread) { + hwy::PinThreadToLogicalProcessor(assigned_cpus[thread]); + }); + } else { + pool.Run(0, pool.NumWorkers(), [](uint64_t /*task*/, size_t thread) { + hwy::PinThreadToLogicalProcessor(thread); + }); } -#else - (void)cpu_index; -#endif } class AppArgs : public ArgsBase {