Use system topology to pin threads across clusters.

PiperOrigin-RevId: 640151974
This commit is contained in:
Zelalem Aweke 2024-06-04 07:49:53 -07:00 committed by Copybara-Service
parent 4f9155d8c6
commit 9e213b3d96
6 changed files with 66 additions and 37 deletions

View File

@ -96,6 +96,8 @@ cc_library(
":gemma_lib",
"//compression:io",
"@hwy//:hwy",
"@hwy//:thread_pool",
"@hwy//:topology",
],
)

View File

@ -109,13 +109,9 @@ int main(int argc, char** argv) {
log_layers_output ? &json_logger.layers_output_log_f : nullptr;
hwy::ThreadPool pool(app.num_threads);
// For many-core, pinning threads to cores helps.
// For many-core, pinning workers to cores helps.
if (app.num_threads > 10) {
gcpp::PinThreadToCore(app.num_threads - 1); // Main thread
pool.Run(0, pool.NumThreads(), [](uint64_t /*task*/, size_t thread) {
gcpp::PinThreadToCore(thread);
});
gcpp::PinWorkersToCores(pool);
}
gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool);

View File

@ -271,13 +271,9 @@ int main(int argc, char** argv) {
}
hwy::ThreadPool pool(app.num_threads);
// For many-core, pinning threads to cores helps.
// For many-core, pinning workers to cores helps.
if (app.num_threads > 10) {
gcpp::PinThreadToCore(app.num_threads - 1); // Main thread
pool.Run(0, pool.NumThreads(), [](uint64_t /*task*/, size_t thread) {
gcpp::PinThreadToCore(thread);
});
gcpp::PinWorkersToCores(pool);
}
gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool);

View File

@ -244,12 +244,9 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) {
PROFILER_ZONE("Run.misc");
hwy::ThreadPool pool(app.num_threads);
// For many-core, pinning threads to cores helps.
// For many-core, pinning workers to cores helps.
if (app.num_threads > 10) {
PinThreadToCore(app.num_threads - 1); // Main thread
pool.Run(0, pool.NumThreads(),
[](uint64_t /*task*/, size_t thread) { PinThreadToCore(thread); });
PinWorkersToCores(pool);
}
gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool);

View File

@ -154,12 +154,9 @@ void Run(LoaderArgs& loader, InferenceArgs& inference, AppArgs& app) {
PROFILER_ZONE("Run.misc");
hwy::ThreadPool pool(app.num_threads);
// For many-core, pinning threads to cores helps.
// For many-core, pinning workers to cores helps.
if (app.num_threads > 10) {
PinThreadToCore(app.num_threads - 1); // Main thread
pool.Run(0, pool.NumThreads(),
[](uint64_t /*task*/, size_t thread) { PinThreadToCore(thread); });
PinWorkersToCores(pool);
}
gcpp::Gemma model(loader.tokenizer, loader.weights, loader.ModelType(), pool);

View File

@ -18,6 +18,7 @@
#ifndef THIRD_PARTY_GEMMA_CPP_UTIL_APP_H_
#define THIRD_PARTY_GEMMA_CPP_UTIL_APP_H_
#include "hwy/contrib/thread_pool/thread_pool.h"
#if HWY_OS_LINUX
#include <sched.h>
@ -30,12 +31,14 @@
#include <algorithm> // std::clamp
#include <thread> // NOLINT>
#include <vector>
#include "compression/io.h" // Path
#include "gemma/configs.h"
#include "gemma/gemma.h"
#include "util/args.h"
#include "hwy/base.h" // HWY_ASSERT
#include "hwy/contrib/thread_pool/topology.h"
namespace gcpp {
@ -61,22 +64,60 @@ static inline const char* CompiledConfig() {
}
}
static inline void PinThreadToCore(size_t cpu_index) {
#if HWY_OS_LINUX
// Forces the thread to run on the logical processor with the same number.
cpu_set_t cset; // bit array
CPU_ZERO(&cset); // clear all
CPU_SET(cpu_index, &cset); // set bit indicating which processor to run on.
const int err = sched_setaffinity(0, sizeof(cset), &cset);
if (err != 0) {
fprintf(stderr,
"sched_setaffinity returned %d, errno %d. Can happen if running in "
"a container; this warning is safe to ignore.\n",
err, errno);
static inline std::vector<size_t> LpsToCpus(
const hwy::LogicalProcessorSet& lps) {
std::vector<size_t> cpus;
cpus.reserve(lps.Count());
lps.Foreach([&cpus](size_t lp) { cpus.push_back(lp); });
return cpus;
}
static inline std::vector<size_t> AssignCpusFromTopology(
const hwy::Topology& topology, const size_t num_workers) {
// Assign CPUs to workers 0 to num_workers - 1 based on the topology.
// The assignments are done in a round-robin fashion across all clusters and
// Cores.
// For example, if we have 4 clusters, the assignments will be:
// Thread 0 -> Cluster 0, Core 0
// Thread 1 -> Cluster 1, Core 0
// Thread 2 -> Cluster 2, Core 0
// Thread 3 -> Cluster 3, Core 0
// Thread 4 -> Cluster 0, Core 1
// Thread 5 -> Cluster 1, Core 1
// ... and so on.
//
// This would result in the least amount of sharing of the last-level
// cache slices. All assignments are made from Package 0.
std::vector<std::vector<size_t>> clusters;
clusters.reserve(topology.packages[0].clusters.size());
for (auto& cluster : topology.packages[0].clusters) {
clusters.push_back(LpsToCpus(cluster.lps));
}
std::vector<size_t> assigned_cpus;
assigned_cpus.reserve(num_workers);
for (size_t i = 0; i < num_workers; ++i) {
size_t cluster_index = i % clusters.size();
size_t cpu_index = (i / clusters.size()) % clusters[cluster_index].size();
assigned_cpus.push_back(clusters[cluster_index][cpu_index]);
}
return assigned_cpus;
}
static inline void PinWorkersToCores(hwy::ThreadPool& pool) {
// Use topology to pin workers to cores if available.
hwy::Topology topology;
if (!topology.packages.empty()) {
std::vector<size_t> assigned_cpus =
AssignCpusFromTopology(topology, pool.NumWorkers());
pool.Run(0, pool.NumWorkers(),
[&assigned_cpus](uint64_t /*task*/, size_t thread) {
hwy::PinThreadToLogicalProcessor(assigned_cpus[thread]);
});
} else {
pool.Run(0, pool.NumWorkers(), [](uint64_t /*task*/, size_t thread) {
hwy::PinThreadToLogicalProcessor(thread);
});
}
#else
(void)cpu_index;
#endif
}
class AppArgs : public ArgsBase<AppArgs> {