From 282f73ec2f6d545a93f961fd537ad31d03a69326 Mon Sep 17 00:00:00 2001 From: Jan Wassenberg Date: Fri, 9 Aug 2024 13:44:42 -0700 Subject: [PATCH] Add pin flag to disable pinning. Refs #338 PiperOrigin-RevId: 661389171 --- examples/hello_world/run.cc | 2 +- gemma/run.cc | 2 +- util/app.h | 7 +++++- util/threading.h | 50 ++++++++++++++++++++++--------------- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/examples/hello_world/run.cc b/examples/hello_world/run.cc index 00425a3..ea7856c 100644 --- a/examples/hello_world/run.cc +++ b/examples/hello_world/run.cc @@ -43,7 +43,7 @@ int main(int argc, char** argv) { } // Instantiate model and KV Cache - gcpp::PerClusterPools pools(app.max_clusters, app.num_threads); + gcpp::PerClusterPools pools(app.max_clusters, app.num_threads, app.pin); gcpp::Gemma model = gcpp::CreateGemma(loader, pools); gcpp::KVCache kv_cache = gcpp::KVCache::Create(loader.Info().model, inference.prefill_tbatch_size); diff --git a/gemma/run.cc b/gemma/run.cc index b519593..b93b4d4 100644 --- a/gemma/run.cc +++ b/gemma/run.cc @@ -166,7 +166,7 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) { // Note that num_threads is an upper bound; we also limit to the number of // detected and enabled cores. - PerClusterPools pools(app.max_clusters, app.num_threads); + PerClusterPools pools(app.max_clusters, app.num_threads, app.pin); Gemma model = CreateGemma(loader, pools); KVCache kv_cache = diff --git a/util/app.h b/util/app.h index ef4efb8..bbec5d7 100644 --- a/util/app.h +++ b/util/app.h @@ -54,10 +54,12 @@ class AppArgs : public ArgsBase { public: AppArgs(int argc, char* argv[]) { InitAndParse(argc, argv); } - Path log; // output int verbosity; + size_t num_threads; // divided among the detected clusters size_t max_clusters; + int pin; // -1 = auto, 0 = no, 1 = yes + std::string eot_line; template @@ -67,10 +69,13 @@ class AppArgs : public ArgsBase { "output\n 1 = standard user-facing terminal ui\n 2 = show " "developer/debug info).\n Default = 1.", 2); + visitor(num_threads, "num_threads", size_t{0}, "Maximum number of threads to use; default 0 = unlimited.", 2); visitor(max_clusters, "max_clusters", size_t{0}, "Maximum number of sockets/CCXs to use; default 0 = unlimited.", 2); + visitor(pin, "pin", -1, "Pin threads? -1 = auto, 0 = no, 1 = yes.", 2); + visitor( eot_line, "eot_line", std::string(""), "End of turn line. " diff --git a/util/threading.h b/util/threading.h index ec2a575..4a995ff 100644 --- a/util/threading.h +++ b/util/threading.h @@ -120,7 +120,9 @@ class PerClusterPools { // result in threads not running on their own core, we only allow for // *upper bounds* on the number of clusters and threads. The actual number of // clusters and threads are still limited by the detected topology. - PerClusterPools(size_t max_clusters, size_t max_threads) + // + // `pin` is 0 or 1 to force enable/disable, or -1 to choose automatically. + PerClusterPools(size_t max_clusters, size_t max_threads, int pin = -1) : have_threading_support_(hwy::HaveThreadingSupport()), cores_per_cluster_(DetectCoresPerCluster()), outer_pool_(CapIfNonzero(cores_per_cluster_.size(), max_clusters)) { @@ -131,9 +133,11 @@ class PerClusterPools { // the first N processors, which are typically on the first socket. const size_t num_threads = CapIfNonzero(hwy::TotalLogicalProcessors() / 2, max_threads); - fprintf(stderr, "CPU topology unknown, using %zu threads\n", num_threads); + if (pin == -1) pin = num_threads > 8; + fprintf(stderr, "CPU topology unknown, using %zu threads, pin %d\n", + num_threads, pin); inner_pools_.push_back(std::make_unique(num_threads)); - if (num_threads > 1) { + if (num_threads > 1 && pin) { inner_pools_.back()->Run(0, num_threads, [](uint64_t /*task*/, size_t thread) { hwy::PinThreadToLogicalProcessor(thread); @@ -149,25 +153,31 @@ class PerClusterPools { inner_pools_.push_back(std::make_unique(num_threads)); } - // For each inner pool, pin their threads AND the associated outer thread - // (the one calling inner.Run()) to the enabled cores in the cluster. - outer_pool_.Run( - 0, outer_pool_.NumWorkers(), - [this](uint64_t outer, size_t outer_thread) { - HWY_ASSERT(outer == outer_thread); // each outer has one task - hwy::ThreadPool& inner = *inner_pools_[outer]; + if (pin == -1) { + pin = (outer_pool_.NumWorkers() * inner_pools_[0]->NumWorkers()) >= 12; + } - const std::vector cores = - CoresInLPS(cores_per_cluster_[outer]); - // May have been capped by max_threads. - HWY_ASSERT(inner.NumWorkers() <= cores.size()); + if (pin) { + // For each inner pool, pin their threads AND the associated outer thread + // (the one calling inner.Run()) to the enabled cores in the cluster. + outer_pool_.Run( + 0, outer_pool_.NumWorkers(), + [this](uint64_t outer, size_t outer_thread) { + HWY_ASSERT(outer == outer_thread); // each outer has one task + hwy::ThreadPool& inner = *inner_pools_[outer]; - inner.Run(0, inner.NumWorkers(), - [&cores](uint64_t task, size_t thread) { - HWY_ASSERT(task == thread); // each inner has one task - hwy::PinThreadToLogicalProcessor(cores[task]); - }); - }); + const std::vector cores = + CoresInLPS(cores_per_cluster_[outer]); + // May have been capped by max_threads. + HWY_ASSERT(inner.NumWorkers() <= cores.size()); + + inner.Run(0, inner.NumWorkers(), + [&cores](uint64_t task, size_t thread) { + HWY_ASSERT(task == thread); // each inner has one task + hwy::PinThreadToLogicalProcessor(cores[task]); + }); + }); + } } // Spinning reduces the latency of barrier synchronization, but wastes lots of