Remove multi-package support from topology

Also no longer assume equal-sized clusters

PiperOrigin-RevId: 820164125
This commit is contained in:
Jan Wassenberg 2025-10-16 04:00:06 -07:00 committed by Copybara-Service
parent 9b6ed1a58f
commit f59eb2ed72
14 changed files with 305 additions and 493 deletions

View File

@ -28,7 +28,6 @@
#include "util/threading_context.h" #include "util/threading_context.h"
#include "hwy/aligned_allocator.h" // Span #include "hwy/aligned_allocator.h" // Span
#include "hwy/base.h" #include "hwy/base.h"
#include "hwy/contrib/thread_pool/thread_pool.h"
#include "hwy/timer.h" #include "hwy/timer.h"
namespace gcpp { namespace gcpp {
@ -104,27 +103,31 @@ BlobVec ReserveMemory(const RangeVec& ranges, BytePtr& all_blobs, size_t& pos) {
// Reads one set of blobs in parallel (helpful if in disk cache). // Reads one set of blobs in parallel (helpful if in disk cache).
// Aborts on error. // Aborts on error.
void ReadBlobs(BlobReader& reader, const RangeVec& ranges, BlobVec& blobs, void ReadBlobs(BlobReader& reader, const RangeVec& ranges, BlobVec& blobs,
hwy::ThreadPool& pool) { ThreadingContext& ctx, size_t cluster_idx) {
HWY_ASSERT(reader.Keys().size() == blobs.size()); HWY_ASSERT(reader.Keys().size() == blobs.size());
HWY_ASSERT(ranges.size() == blobs.size()); HWY_ASSERT(ranges.size() == blobs.size());
pool.Run(0, blobs.size(), [&](size_t i, size_t /*thread*/) { ParallelFor(ParallelismStrategy::kWithinCluster, blobs.size(), ctx,
HWY_ASSERT(ranges[i].bytes == blobs[i].size()); cluster_idx, [&](size_t i, size_t /*thread*/) {
reader.file().Read(ranges[i].offset, ranges[i].bytes, blobs[i].data()); HWY_ASSERT(ranges[i].bytes == blobs[i].size());
}); reader.file().Read(ranges[i].offset, ranges[i].bytes,
blobs[i].data());
});
} }
// Parallelizes ReadBlobs across (two) packages, if available. // Parallelizes ReadBlobs across (two) packages, if available.
void ReadBothBlobs(BlobReader& reader1, BlobReader& reader2, void ReadBothBlobs(BlobReader& reader1, BlobReader& reader2,
const RangeVec& ranges1, const RangeVec& ranges2, const RangeVec& ranges1, const RangeVec& ranges2,
size_t total_bytes, BlobVec& blobs1, BlobVec& blobs2, size_t total_bytes, BlobVec& blobs1, BlobVec& blobs2,
NestedPools& pools) { ThreadingContext& ctx) {
const double t0 = hwy::platform::Now(); const double t0 = hwy::platform::Now();
HWY_WARN("Reading %zu GiB, %zux%zu cores: ", total_bytes >> 30, HWY_WARN("Reading %zu GiB, %zu clusters: ", total_bytes >> 30,
pools.AllPackages().NumWorkers(), pools.Pool().NumWorkers()); ctx.pools.NumClusters());
pools.AllPackages().Run(0, 2, [&](size_t task, size_t pkg_idx) { ParallelFor(ParallelismStrategy::kAcrossClusters, 2, ctx, 0,
ReadBlobs(task ? reader2 : reader1, task ? ranges2 : ranges1, [&](const size_t task, size_t cluster_idx) {
task ? blobs2 : blobs1, pools.Pool(pkg_idx)); ReadBlobs(task ? reader1 : reader2, task ? ranges1 : ranges2,
}); task ? blobs1 : blobs2, ctx, cluster_idx);
});
const double t1 = hwy::platform::Now(); const double t1 = hwy::platform::Now();
HWY_WARN("%.1f GB/s\n", total_bytes / (t1 - t0) * 1E-9); HWY_WARN("%.1f GB/s\n", total_bytes / (t1 - t0) * 1E-9);
} }
@ -181,29 +184,23 @@ size_t BlobDifferences(const ByteSpan data1, const ByteSpan data2,
} }
void CompareBlobs(const KeyVec& keys, BlobVec& blobs1, BlobVec& blobs2, void CompareBlobs(const KeyVec& keys, BlobVec& blobs1, BlobVec& blobs2,
size_t total_bytes, NestedPools& pools) { size_t total_bytes, ThreadingContext& ctx) {
HWY_WARN("Comparing %zu blobs in parallel: ", keys.size()); HWY_WARN("Comparing %zu blobs in parallel: ", keys.size());
const double t0 = hwy::platform::Now(); const double t0 = hwy::platform::Now();
std::atomic<size_t> blobs_equal{}; std::atomic<size_t> blobs_equal{};
std::atomic<size_t> blobs_diff{}; std::atomic<size_t> blobs_diff{};
const IndexRangePartition ranges = StaticPartition( ParallelFor(ParallelismStrategy::kHierarchical, keys.size(), ctx, 0,
IndexRange(0, keys.size()), pools.AllPackages().NumWorkers(), 1); [&](size_t i, size_t /*thread*/) {
ParallelizeOneRange( const size_t mismatches =
ranges, pools.AllPackages(), BlobDifferences(blobs1[i], blobs2[i], keys[i]);
[&](const IndexRange& range, size_t pkg_idx) { if (mismatches != 0) {
pools.Pool(pkg_idx).Run( HWY_WARN("key %s has %zu mismatches in %zu bytes!\n",
range.begin(), range.end(), [&](size_t i, size_t /*thread*/) { keys[i].c_str(), mismatches, blobs1[i].size());
const size_t mismatches = blobs_diff.fetch_add(1);
BlobDifferences(blobs1[i], blobs2[i], keys[i]); } else {
if (mismatches != 0) { blobs_equal.fetch_add(1);
HWY_WARN("key %s has %zu mismatches in %zu bytes!\n", }
keys[i].c_str(), mismatches, blobs1[i].size()); });
blobs_diff.fetch_add(1);
} else {
blobs_equal.fetch_add(1);
}
});
});
const double t1 = hwy::platform::Now(); const double t1 = hwy::platform::Now();
HWY_WARN("%.1f GB/s; total blob matches=%zu, mismatches=%zu\n", HWY_WARN("%.1f GB/s; total blob matches=%zu, mismatches=%zu\n",
total_bytes / (t1 - t0) * 1E-9, blobs_equal.load(), total_bytes / (t1 - t0) * 1E-9, blobs_equal.load(),
@ -230,9 +227,9 @@ void ReadAndCompareBlobs(const Path& path1, const Path& path2) {
ThreadingArgs args; ThreadingArgs args;
ThreadingContext ctx(args); ThreadingContext ctx(args);
ReadBothBlobs(reader1, reader2, ranges1, ranges2, total_bytes, blobs1, blobs2, ReadBothBlobs(reader1, reader2, ranges1, ranges2, total_bytes, blobs1, blobs2,
ctx.pools); ctx);
CompareBlobs(reader1.Keys(), blobs1, blobs2, total_bytes, ctx.pools); CompareBlobs(reader1.Keys(), blobs1, blobs2, total_bytes, ctx);
} }
} // namespace gcpp } // namespace gcpp

View File

@ -1124,8 +1124,9 @@ void TestAllDot() {
MatPadding::kOdd); MatPadding::kOdd);
std::array<DotStats, kMaxWorkers> all_stats; std::array<DotStats, kMaxWorkers> all_stats;
ctx.pools.Cluster(0, 0).Run( ParallelFor(
0, kReps, [&](const uint32_t rep, size_t thread) { ParallelismStrategy::kWithinCluster, kReps, ctx, 0,
[&](size_t rep, size_t thread) {
float* HWY_RESTRICT pa = a.Row(thread); float* HWY_RESTRICT pa = a.Row(thread);
float* HWY_RESTRICT pb = b.Row(thread); float* HWY_RESTRICT pb = b.Row(thread);
double* HWY_RESTRICT buf = bufs.Row(thread); double* HWY_RESTRICT buf = bufs.Row(thread);

View File

@ -351,7 +351,7 @@ std::vector<MMConfig> MMCandidates(const CacheInfo& cache, size_t M, size_t K,
MatMulEnv::MatMulEnv(ThreadingContext& ctx) MatMulEnv::MatMulEnv(ThreadingContext& ctx)
: ctx(ctx), A_BF(ctx.allocator), C_tiles(ctx) { : ctx(ctx), A_BF(ctx.allocator), C_tiles(ctx) {
const size_t num_clusters = ctx.pools.AllClusters(/*pkg_idx=*/0).NumWorkers(); const size_t num_clusters = ctx.pools.NumClusters();
per_cluster.resize(num_clusters); per_cluster.resize(num_clusters);
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
row_ptrs.push_back(hwy::AllocateAligned<uint8_t*>(kMaxBatchSize)); // C row_ptrs.push_back(hwy::AllocateAligned<uint8_t*>(kMaxBatchSize)); // C
@ -368,7 +368,7 @@ void BindB(ThreadingContext& ctx, MatPtr& B, size_t sizeof_TC) {
PROFILER_ZONE("Startup.BindB"); PROFILER_ZONE("Startup.BindB");
const size_t node = ctx.topology.GetCluster(/*pkg_idx=*/0, 0).Node(); const size_t node = ctx.topology.GetCluster(0).Node();
uintptr_t begin = reinterpret_cast<uintptr_t>(B.RowBytes(0)); uintptr_t begin = reinterpret_cast<uintptr_t>(B.RowBytes(0));
uintptr_t end = begin + B.Rows() * B.Stride() * B.ElementBytes(); uintptr_t end = begin + B.Rows() * B.Stride() * B.ElementBytes();
// B row padding is less than the page size, so only bind the subset that // B row padding is less than the page size, so only bind the subset that
@ -394,7 +394,7 @@ void BindC(ThreadingContext& ctx, MatPtr& C) {
const size_t end = hwy::RoundDownTo(cols_c.end() * C.ElementBytes(), const size_t end = hwy::RoundDownTo(cols_c.end() * C.ElementBytes(),
allocator.BasePageBytes()); allocator.BasePageBytes());
const size_t node = ctx.topology.GetCluster(/*pkg_idx=*/0, 0).Node(); const size_t node = ctx.topology.GetCluster(0).Node();
bool ok = true; bool ok = true;
for (size_t im = 0; im < C.Rows(); ++im) { for (size_t im = 0; im < C.Rows(); ++im) {
ok &= allocator.BindMemory(C.RowBytes(im) + begin, end - begin, node); ok &= allocator.BindMemory(C.RowBytes(im) + begin, end - begin, node);

View File

@ -105,8 +105,7 @@ struct MMParallelWithinCluster {
size_t inner_tasks, size_t cluster_idx, const Func& func) const { size_t inner_tasks, size_t cluster_idx, const Func& func) const {
HWY_DASSERT(1 <= inner_tasks && inner_tasks <= 4); HWY_DASSERT(1 <= inner_tasks && inner_tasks <= 4);
const size_t pkg_idx = 0; hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx);
const size_t base = ctx.Worker(cluster_idx); const size_t base = ctx.Worker(cluster_idx);
const IndexRangePartition ranges_n = StaticPartition( const IndexRangePartition ranges_n = StaticPartition(
@ -122,8 +121,7 @@ struct MMParallelWithinCluster {
const IndexRangePartition& ranges_mc, const IndexRangePartition& ranges_mc,
const IndexRangePartition& ranges_nc, size_t cluster_idx, const IndexRangePartition& ranges_nc, size_t cluster_idx,
const Func& func) const { const Func& func) const {
const size_t pkg_idx = 0; hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx);
const size_t base = ctx.Worker(cluster_idx); const size_t base = ctx.Worker(cluster_idx);
// Low-batch: avoid Divide/Remainder. // Low-batch: avoid Divide/Remainder.
@ -143,8 +141,7 @@ struct MMParallelWithinCluster {
template <class Func> template <class Func>
void ForRangeMC(ThreadingContext& ctx, const IndexRange& range_mc, void ForRangeMC(ThreadingContext& ctx, const IndexRange& range_mc,
size_t cluster_idx, const Func& func) const { size_t cluster_idx, const Func& func) const {
const size_t pkg_idx = 0; hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx);
const size_t base = ctx.Worker(cluster_idx); const size_t base = ctx.Worker(cluster_idx);
cluster.Run( cluster.Run(
@ -164,12 +161,11 @@ struct MMParallelHierarchical {
HWY_DASSERT(caller_cluster_idx == 0); HWY_DASSERT(caller_cluster_idx == 0);
// Single cluster: parallel-for over static partition of `range_n`. // Single cluster: parallel-for over static partition of `range_n`.
const size_t pkg_idx = 0; hwy::ThreadPool& all_clusters = ctx.pools.AllClusters();
hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx);
const size_t num_clusters = all_clusters.NumWorkers(); const size_t num_clusters = all_clusters.NumWorkers();
if (num_clusters == 1) { if (num_clusters == 1) {
const size_t cluster_idx = 0; const size_t cluster_idx = 0;
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
const IndexRangePartition ranges_n = StaticPartition( const IndexRangePartition ranges_n = StaticPartition(
range_n, cluster.NumWorkers() * inner_tasks, n_multiple); range_n, cluster.NumWorkers() * inner_tasks, n_multiple);
return ParallelizeOneRange( return ParallelizeOneRange(
@ -185,7 +181,7 @@ struct MMParallelHierarchical {
ParallelizeOneRange( ParallelizeOneRange(
ranges_n, all_clusters, ranges_n, all_clusters,
[&](const IndexRange& n_range, const size_t cluster_idx) { [&](const IndexRange& n_range, const size_t cluster_idx) {
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
const size_t cluster_base = ctx.Worker(cluster_idx); const size_t cluster_base = ctx.Worker(cluster_idx);
// Parallel-for over sub-ranges of `cluster_range` within the cluster. // Parallel-for over sub-ranges of `cluster_range` within the cluster.
const IndexRangePartition worker_ranges = StaticPartition( const IndexRangePartition worker_ranges = StaticPartition(
@ -206,17 +202,16 @@ struct MMParallelHierarchical {
const IndexRangePartition& ranges_nc, const IndexRangePartition& ranges_nc,
HWY_MAYBE_UNUSED size_t caller_cluster_idx, HWY_MAYBE_UNUSED size_t caller_cluster_idx,
const Func& func) const { const Func& func) const {
const size_t pkg_idx = 0;
HWY_DASSERT(caller_cluster_idx == 0); HWY_DASSERT(caller_cluster_idx == 0);
hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx); hwy::ThreadPool& all_clusters = ctx.pools.AllClusters();
// `all_clusters` is a pool with one worker per cluster in a package. // `all_clusters` is a pool with one worker per cluster in a package.
const size_t num_clusters = all_clusters.NumWorkers(); const size_t num_clusters = all_clusters.NumWorkers();
// Single (big) cluster: collapse two range indices into one parallel-for // Single (big) cluster: collapse two range indices into one parallel-for
// to reduce the number of fork-joins. // to reduce the number of fork-joins.
if (num_clusters == 1) { if (num_clusters == 1) {
const size_t cluster_idx = 0; const size_t cluster_idx = 0;
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
// Low-batch: avoid Divide/Remainder. // Low-batch: avoid Divide/Remainder.
if (HWY_UNLIKELY(ranges_mc.NumTasks() == 1)) { if (HWY_UNLIKELY(ranges_mc.NumTasks() == 1)) {
return ParallelizeOneRange( return ParallelizeOneRange(
@ -237,7 +232,7 @@ struct MMParallelHierarchical {
ranges_nc, all_clusters, ranges_nc, all_clusters,
[&](const IndexRange range_nc, size_t cluster_idx) { [&](const IndexRange range_nc, size_t cluster_idx) {
const size_t cluster_base = ctx.Worker(cluster_idx); const size_t cluster_base = ctx.Worker(cluster_idx);
hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx);
ParallelizeOneRange(ranges_mc, cluster, ParallelizeOneRange(ranges_mc, cluster,
[&](const IndexRange& range_mc, size_t worker) { [&](const IndexRange& range_mc, size_t worker) {
func(range_mc, range_nc, cluster_base + worker); func(range_mc, range_nc, cluster_base + worker);

View File

@ -191,29 +191,22 @@ HWY_INLINE void MatMulSlow(const MatPtrT<TA> A, const MatPtrT<TB> B,
const IndexRange all_cols_c(0, C.Cols()); const IndexRange all_cols_c(0, C.Cols());
NestedPools& pools = env.ctx.pools; NestedPools& pools = env.ctx.pools;
hwy::ThreadPool& all_packages = pools.AllPackages(); hwy::ThreadPool& all_clusters = pools.AllClusters();
const IndexRangePartition get_row_c = const size_t multiple = env.ctx.allocator.QuantumBytes() / sizeof(TB);
StaticPartition(all_rows_c, all_packages.NumWorkers(), 1); const IndexRangePartition get_col_c =
StaticPartition(all_cols_c, all_clusters.NumWorkers(), multiple);
ParallelizeOneRange( ParallelizeOneRange(
get_row_c, all_packages, get_col_c, all_clusters,
[&](const IndexRange& rows_c, size_t package_idx) HWY_ATTR { [&](const IndexRange& cols_c, size_t cluster_idx) HWY_ATTR {
hwy::ThreadPool& all_clusters = pools.AllClusters(package_idx); for (size_t r : all_rows_c) {
const size_t multiple = env.ctx.allocator.QuantumBytes() / sizeof(TB); TC* HWY_RESTRICT C_row = C.Row(r);
const IndexRangePartition get_col_c = for (size_t c : cols_c) {
StaticPartition(all_cols_c, all_clusters.NumWorkers(), multiple); const float add = add_row ? add_row[c] : 0.0f;
ParallelizeOneRange( const float dot =
get_col_c, all_clusters, Dot(df, b_span, c * B.Stride(), A.Row(r), A.Cols());
[&](const IndexRange& cols_c, size_t cluster_idx) HWY_ATTR { C_row[c] = hwy::ConvertScalarTo<TC>(add + scale * dot);
for (size_t r : rows_c) { }
TC* HWY_RESTRICT C_row = C.Row(r); }
for (size_t c : cols_c) {
const float add = add_row ? add_row[c] : 0.0f;
const float dot =
Dot(df, b_span, c * B.Stride(), A.Row(r), A.Cols());
C_row[c] = hwy::ConvertScalarTo<TC>(add + scale * dot);
}
}
});
}); });
} }

View File

@ -139,7 +139,7 @@ CacheInfo::CacheInfo(const BoundedTopology& topology) {
step_bytes_ = HWY_MAX(line_bytes_, vector_bytes_); step_bytes_ = HWY_MAX(line_bytes_, vector_bytes_);
const BoundedTopology::Cluster& cluster = topology.GetCluster(0, 0); const BoundedTopology::Cluster& cluster = topology.GetCluster(0);
if (const hwy::Cache* caches = hwy::DataCaches()) { if (const hwy::Cache* caches = hwy::DataCaches()) {
l1_bytes_ = caches[1].size_kib << 10; l1_bytes_ = caches[1].size_kib << 10;
l2_bytes_ = caches[2].size_kib << 10; l2_bytes_ = caches[2].size_kib << 10;

View File

@ -169,7 +169,7 @@ class Allocator {
bool ShouldBind() const { return should_bind_; } bool ShouldBind() const { return should_bind_; }
// Attempts to move(!) `[p, p + bytes)` to the given NUMA node, which is // Attempts to move(!) `[p, p + bytes)` to the given NUMA node, which is
// typically `BoundedTopology::GetCluster(package_idx, cluster_idx).node`. // typically `BoundedTopology::GetCluster(cluster_idx).node`.
// Writes zeros to SOME of the memory. Only call if `ShouldBind()`. // Writes zeros to SOME of the memory. Only call if `ShouldBind()`.
// `p` and `bytes` must be multiples of `QuantumBytes()`. // `p` and `bytes` must be multiples of `QuantumBytes()`.
bool BindMemory(void* p, size_t bytes, size_t node) const; bool BindMemory(void* p, size_t bytes, size_t node) const;

View File

@ -18,7 +18,6 @@
#include <stdio.h> #include <stdio.h>
#include <algorithm> // std::sort
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <vector> #include <vector>
@ -31,14 +30,6 @@
namespace gcpp { namespace gcpp {
// Sort T := packages/clusters by descending 'size' so that users who only use
// one Group get the largest.
template <class T>
static void SortByDescendingSize(std::vector<T>& groups) {
std::sort(groups.begin(), groups.end(),
[](const T& a, const T& b) { return a.Size() > b.Size(); });
}
static bool InContainer() { static bool InContainer() {
return false; // placeholder for container detection, do not remove return false; // placeholder for container detection, do not remove
} }
@ -55,19 +46,18 @@ PinningPolicy::PinningPolicy(Tristate pin) {
// If `pinning.Want()`, tries to pin each worker in `pool` to an LP in // If `pinning.Want()`, tries to pin each worker in `pool` to an LP in
// `cluster`, and calls `pinning.NotifyFailed()` if any fails. // `cluster`, and calls `pinning.NotifyFailed()` if any fails.
void MaybePin(const BoundedTopology& topology, size_t pkg_idx, static void MaybePin(const BoundedTopology& topology, size_t cluster_idx,
size_t cluster_idx, const BoundedTopology::Cluster& cluster, const BoundedTopology::Cluster& cluster,
PinningPolicy& pinning, hwy::ThreadPool& pool) { PinningPolicy& pinning, hwy::ThreadPool& pool) {
const std::vector<size_t> lps = cluster.LPVector(); const std::vector<size_t> lps = cluster.LPVector();
HWY_ASSERT(pool.NumWorkers() <= lps.size()); HWY_ASSERT(pool.NumWorkers() <= lps.size());
pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) {
HWY_ASSERT(task == thread); // each worker has one task HWY_ASSERT(task == thread); // each worker has one task
char buf[16]; // Linux limitation char buf[16]; // Linux limitation
const int bytes_written = snprintf(buf, sizeof(buf), "P%zu X%02zu C%03d", const int bytes_written = snprintf(
topology.SkippedPackages() + pkg_idx, buf, sizeof(buf), "P%zu X%02zu C%03d", topology.SkippedPackages(),
topology.SkippedClusters() + cluster_idx, topology.SkippedClusters() + cluster_idx, static_cast<int>(task));
static_cast<int>(task));
HWY_ASSERT(bytes_written < static_cast<int>(sizeof(buf))); HWY_ASSERT(bytes_written < static_cast<int>(sizeof(buf)));
hwy::SetThreadName(buf, 0); // does not support varargs hwy::SetThreadName(buf, 0); // does not support varargs
@ -113,79 +103,56 @@ static size_t DivideMaxAcross(const size_t max, const size_t instances) {
return max; return max;
} }
NestedPools::NestedPools(const BoundedTopology& topology,
const Allocator& allocator, size_t max_threads,
Tristate pin)
: pinning_(pin) {
packages_.resize(topology.NumPackages());
all_packages_ =
MakePool(allocator, packages_.size(), hwy::PoolWorkerMapping());
const size_t max_workers_per_package =
DivideMaxAcross(max_threads, packages_.size());
// Each worker in all_packages_, including the main thread, will be the
// calling thread of an all_clusters->Run, and hence pinned to one of the
// `cluster.lps` if `pin`.
all_packages_->Run(0, packages_.size(), [&](uint64_t pkg_idx, size_t thread) {
HWY_ASSERT(pkg_idx == thread); // each thread has one task
packages_[pkg_idx] = Package(topology, allocator, pinning_, pkg_idx,
max_workers_per_package);
});
all_pinned_ = pinning_.AllPinned(&pin_string_);
// For mapping package/cluster/thread to noncontiguous TLS indices, in case
// cluster/thread counts differ.
HWY_ASSERT(!packages_.empty() && packages_.size() <= 16);
for (const Package& p : packages_) {
max_clusters_per_package_ =
HWY_MAX(max_clusters_per_package_, p.NumClusters());
max_workers_per_cluster_ =
HWY_MAX(max_workers_per_cluster_, p.MaxWorkersPerCluster());
}
HWY_ASSERT(max_clusters_per_package_ >= 1);
HWY_ASSERT(max_clusters_per_package_ <= 64);
HWY_ASSERT(max_workers_per_cluster_ >= 1);
HWY_ASSERT(max_workers_per_cluster_ <= 256);
}
// `max_or_zero` == 0 means no limit. // `max_or_zero` == 0 means no limit.
static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) { static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) {
return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero); return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero);
} }
NestedPools::Package::Package(const BoundedTopology& topology, NestedPools::NestedPools(const BoundedTopology& topology,
const Allocator& allocator, const Allocator& allocator, size_t max_threads,
PinningPolicy& pinning, size_t pkg_idx, Tristate pin)
size_t max_workers_per_package) { : pinning_(pin) {
// Pre-allocate because elements are set concurrently. const size_t num_clusters = topology.NumClusters();
clusters_.resize(topology.NumClusters(pkg_idx)); const size_t cluster_workers_cap = DivideMaxAcross(max_threads, num_clusters);
const size_t max_workers_per_cluster =
DivideMaxAcross(max_workers_per_package, clusters_.size()); // Precompute cluster sizes to ensure we pass the same values to `MakePool`.
// The max is also used for `all_clusters_mapping`, see below.
size_t workers_per_cluster[hwy::kMaxClusters] = {};
size_t all_clusters_node = 0;
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
const BoundedTopology::Cluster& tcluster = topology.GetCluster(cluster_idx);
workers_per_cluster[cluster_idx] =
CapIfNonZero(tcluster.NumWorkers(), cluster_workers_cap);
// Cluster sizes can vary because individual LPs may be disabled. Use the
// max so that `GlobalIdx` is consistent within and across clusters. It is
// OK to have holes or gaps in the worker index space.
max_workers_per_cluster_ =
HWY_MAX(max_workers_per_cluster_, workers_per_cluster[cluster_idx]);
all_clusters_node = tcluster.Node(); // arbitrarily use the last node seen
}
const BoundedTopology::Cluster& cluster0 = topology.GetCluster(pkg_idx, 0);
// Core 0 of each cluster. The second argument is the cluster size, not
// number of clusters. We ensure that it is the same for all clusters so that
// the `GlobalIdx` computation is consistent within and across clusters.
const hwy::PoolWorkerMapping all_clusters_mapping(hwy::kAllClusters, const hwy::PoolWorkerMapping all_clusters_mapping(hwy::kAllClusters,
cluster0.Size()); max_workers_per_cluster_);
all_clusters_ = MakePool(allocator, clusters_.size(), all_clusters_mapping, all_clusters_ = MakePool(allocator, num_clusters, all_clusters_mapping,
cluster0.Node()); all_clusters_node);
// Pre-allocate because elements are set concurrently.
clusters_.resize(num_clusters);
// Parallel so we also pin the calling worker in `all_clusters` to // Parallel so we also pin the calling worker in `all_clusters` to
// `cluster.lps`. // `cluster.lps`.
all_clusters_->Run( all_clusters_->Run(0, num_clusters, [&](size_t cluster_idx, size_t thread) {
0, all_clusters_->NumWorkers(), [&](size_t cluster_idx, size_t thread) { HWY_ASSERT(cluster_idx == thread); // each thread has one task
HWY_ASSERT(cluster_idx == thread); // each thread has one task const BoundedTopology::Cluster& tcluster = topology.GetCluster(cluster_idx);
const BoundedTopology::Cluster& cluster = clusters_[cluster_idx] =
topology.GetCluster(pkg_idx, cluster_idx); MakePool(allocator, workers_per_cluster[cluster_idx],
HWY_ASSERT(cluster.Size() == cluster0.Size()); hwy::PoolWorkerMapping(cluster_idx, max_workers_per_cluster_),
clusters_[cluster_idx] = MakePool( tcluster.Node());
allocator, CapIfNonZero(cluster.Size(), max_workers_per_cluster), // Pin workers AND the calling thread from `all_clusters_`.
hwy::PoolWorkerMapping(cluster_idx, cluster.Size()), MaybePin(topology, cluster_idx, tcluster, pinning_,
cluster.Node()); *clusters_[cluster_idx]);
// Pin workers AND the calling thread from `all_clusters`. });
MaybePin(topology, pkg_idx, cluster_idx, cluster, pinning, all_pinned_ = pinning_.AllPinned(&pin_string_);
*clusters_[cluster_idx]);
});
} }
} // namespace gcpp } // namespace gcpp

View File

@ -66,17 +66,14 @@ class PinningPolicy {
}; // PinningPolicy }; // PinningPolicy
// Creates a hierarchy of thread pools according to `BoundedTopology`: one with // Creates a hierarchy of thread pools according to `BoundedTopology`: one with
// a thread per enabled package; for each of those, one with a thread per // a thread per enabled cluster (CCX/shared L3), and for each of those, the
// enabled cluster (CCX/shared L3), and for each of those, the remaining // remaining enabled cores in that cluster.
// enabled cores in that cluster.
// //
// Note that we support spin waits, thus it is important for each thread to be // Note that we support spin waits, thus it is important for each thread to be
// responsive, hence we do not create more than one thread per enabled core. // responsive, hence we do not create more than one thread per enabled core.
// For example, when there are two packages with four clusters of 8 cores, // For example, when there are four clusters of 8 cores, `AllClusters` has the
// `AllPackages` has the main thread plus one extra thread, each `AllClusters` // main thread plus three extras, each `Cluster` runs on one of `AllClusters`
// has one of the `AllPackages` threads plus three extras, each `Cluster` runs // plus seven extras, for a total of 3 + (4*7) = 31 extras plus the main thread.
// on one `AllClusters` thread plus seven extra workers, for a total of
// 1 + 2*3 + 2*(4*7) = 63 extras plus the main thread.
// //
// Useful when there are tasks which should be parallelized by workers sharing a // Useful when there are tasks which should be parallelized by workers sharing a
// cache, or on the same NUMA node. In both cases, individual pools have lower // cache, or on the same NUMA node. In both cases, individual pools have lower
@ -96,6 +93,10 @@ class NestedPools {
NestedPools(NestedPools&&) = delete; NestedPools(NestedPools&&) = delete;
NestedPools& operator=(NestedPools&&) = delete; NestedPools& operator=(NestedPools&&) = delete;
// Because cross-package latency is high, this interface assumes only one
// package is used. The `skip_packages` argument to `BoundedTopology` selects
// which package that is for this `NestedPools` instance.
//
// `max_threads` is the maximum number of threads to divide among all // `max_threads` is the maximum number of threads to divide among all
// clusters. This is more intuitive than a per-cluster limit for users who // clusters. This is more intuitive than a per-cluster limit for users who
// may not be aware of the CPU topology. This should be zero (meaning no // may not be aware of the CPU topology. This should be zero (meaning no
@ -104,8 +105,8 @@ class NestedPools {
// //
// To ensure we do not create more threads than there are HW cores, which // To ensure we do not create more threads than there are HW cores, which
// would cause huge slowdowns when spinning, the `BoundedSlice` arguments // would cause huge slowdowns when spinning, the `BoundedSlice` arguments
// only impose upper bounds on the number of detected packages and clusters // only impose upper bounds on the number of detected clusters rather than
// rather than defining the actual number of threads. // defining the actual number of threads.
NestedPools(const BoundedTopology& topology, const Allocator& allocator, NestedPools(const BoundedTopology& topology, const Allocator& allocator,
size_t max_threads = 0, Tristate pin = Tristate::kDefault); size_t max_threads = 0, Tristate pin = Tristate::kDefault);
@ -133,98 +134,37 @@ class NestedPools {
} }
} }
size_t NumPackages() const { return packages_.size(); } size_t NumClusters() const { return clusters_.size(); }
hwy::ThreadPool& AllPackages() { return *all_packages_; } hwy::ThreadPool& AllClusters() { return *all_clusters_; }
hwy::ThreadPool& AllClusters(size_t pkg_idx) { hwy::ThreadPool& Cluster(size_t cluster_idx) {
HWY_DASSERT(pkg_idx < NumPackages()); HWY_DASSERT(cluster_idx < clusters_.size());
return packages_[pkg_idx].AllClusters(); return *clusters_[cluster_idx];
}
hwy::ThreadPool& Cluster(size_t pkg_idx, size_t cluster_idx) {
HWY_DASSERT(pkg_idx < NumPackages());
return packages_[pkg_idx].Cluster(cluster_idx);
} }
// Reasonably tight upper bounds for allocating thread-local storage (TLS). // Reasonably tight upper bounds for allocating thread-local storage (TLS).
size_t MaxWorkersPerCluster() const { return max_workers_per_cluster_; } size_t MaxWorkersPerCluster() const { return max_workers_per_cluster_; }
size_t MaxWorkersPerPackage() const { size_t MaxWorkers() const { return NumClusters() * MaxWorkersPerCluster(); }
return max_clusters_per_package_ * MaxWorkersPerCluster();
}
size_t MaxWorkers() const { return NumPackages() * MaxWorkersPerPackage(); }
// Actual number of workers.
size_t TotalWorkers() const {
size_t total_workers = 0;
for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) {
total_workers += packages_[pkg_idx].TotalWorkers();
}
return total_workers;
}
// For ShowConfig // For ShowConfig
const char* PinString() const { return pin_string_; } const char* PinString() const { return pin_string_; }
// Returns a single pool on the given package: either one thread per cluster // Returns a single pool on the given package: either one thread per cluster
// if there is more than one, which maximizes available memory bandwidth, or // if there is more than one, which maximizes available memory bandwidth, or
// the first cluster, which is typically the whole package. For use by callers // the first cluster, which is typically the whole package. For use by
// that only have a single parallel-for. // callers that only have a single parallel-for.
// DEPRECATED: use ParallelFor instead.
hwy::ThreadPool& Pool(size_t pkg_idx = 0) { hwy::ThreadPool& Pool(size_t pkg_idx = 0) {
// Only one cluster: use its pool, typically a whole socket. // Only one cluster: use its pool, typically a whole socket.
if (AllClusters(pkg_idx).NumWorkers() == 1) { if (NumClusters() == 1) return Cluster(0);
return Cluster(pkg_idx, 0);
}
// One worker per cluster to maximize bandwidth availability. // One worker per cluster to maximize bandwidth availability.
return AllClusters(pkg_idx); return AllClusters();
} }
private: private:
class Package {
public:
Package() = default; // for vector
Package(const BoundedTopology& topology, const Allocator& allocator,
PinningPolicy& pinning, size_t pkg_idx,
size_t max_workers_per_package);
size_t NumClusters() const { return clusters_.size(); }
size_t MaxWorkersPerCluster() const {
size_t max_workers_per_cluster = 0;
for (const PoolPtr& cluster : clusters_) {
max_workers_per_cluster =
HWY_MAX(max_workers_per_cluster, cluster->NumWorkers());
}
return max_workers_per_cluster;
}
size_t TotalWorkers() const {
size_t total_workers = 0;
for (const PoolPtr& cluster : clusters_) {
total_workers += cluster->NumWorkers();
}
return total_workers;
}
hwy::ThreadPool& AllClusters() { return *all_clusters_; }
hwy::ThreadPool& Cluster(size_t cluster_idx) {
HWY_DASSERT(cluster_idx < clusters_.size());
return *clusters_[cluster_idx];
}
void SetWaitMode(hwy::PoolWaitMode wait_mode) {
all_clusters_->SetWaitMode(wait_mode);
for (PoolPtr& cluster : clusters_) {
cluster->SetWaitMode(wait_mode);
}
}
private:
// Must be freed after `clusters_` because it reserves threads which are
// the main threads of `clusters_`.
PoolPtr all_clusters_;
std::vector<PoolPtr> clusters_;
}; // Package
void SetWaitMode(hwy::PoolWaitMode wait_mode) { void SetWaitMode(hwy::PoolWaitMode wait_mode) {
all_packages_->SetWaitMode(wait_mode); all_clusters_->SetWaitMode(wait_mode);
for (Package& package : packages_) { for (PoolPtr& cluster : clusters_) {
package.SetWaitMode(wait_mode); cluster->SetWaitMode(wait_mode);
} }
} }
@ -232,12 +172,13 @@ class NestedPools {
bool all_pinned_; bool all_pinned_;
const char* pin_string_; const char* pin_string_;
std::vector<Package> packages_; // Must be freed after `clusters_` because it reserves threads which are
PoolPtr all_packages_; // the main threads of `clusters_`.
PoolPtr all_clusters_;
std::vector<PoolPtr> clusters_;
// For TLS indices. One might think this belongs in BoundedTopology, but it // Used by `PoolWorkerMapping`. This depends on the `max_threads` argument,
// depends on max_threads, which is passed to the NestedPools constructor. // hence we can only compute this here, not in `BoundedTopology`.
size_t max_clusters_per_package_ = 0;
size_t max_workers_per_cluster_ = 0; size_t max_workers_per_cluster_ = 0;
}; };
@ -362,14 +303,11 @@ void ParallelizeTwoRanges(const IndexRangePartition& get1,
template <class Func> template <class Func>
void HierarchicalParallelFor(size_t num_tasks, NestedPools& pools, void HierarchicalParallelFor(size_t num_tasks, NestedPools& pools,
const Func& func) { const Func& func) {
// Even if there are multiple packages, we only use the first.
const size_t pkg_idx = 0;
// If few tasks, run on a single cluster. Also avoids a bit of overhead if // If few tasks, run on a single cluster. Also avoids a bit of overhead if
// there is only one cluster. // there is only one cluster.
hwy::ThreadPool& all_clusters = pools.AllClusters(pkg_idx); hwy::ThreadPool& all_clusters = pools.AllClusters();
const size_t num_clusters = all_clusters.NumWorkers(); const size_t num_clusters = all_clusters.NumWorkers();
hwy::ThreadPool& cluster = pools.Cluster(pkg_idx, 0); hwy::ThreadPool& cluster = pools.Cluster(0);
if (num_clusters == 1 || num_tasks <= cluster.NumWorkers()) { if (num_clusters == 1 || num_tasks <= cluster.NumWorkers()) {
return cluster.Run(0, num_tasks, [&](uint64_t task, size_t thread) { return cluster.Run(0, num_tasks, [&](uint64_t task, size_t thread) {
func(task, thread); func(task, thread);
@ -382,7 +320,7 @@ void HierarchicalParallelFor(size_t num_tasks, NestedPools& pools,
ParallelizeOneRange( ParallelizeOneRange(
ranges, all_clusters, ranges, all_clusters,
[&](const IndexRange& range, const size_t cluster_idx) { [&](const IndexRange& range, const size_t cluster_idx) {
hwy::ThreadPool& cluster = pools.Cluster(pkg_idx, cluster_idx); hwy::ThreadPool& cluster = pools.Cluster(cluster_idx);
const size_t cluster_base = cluster_idx * pools.MaxWorkersPerCluster(); const size_t cluster_base = cluster_idx * pools.MaxWorkersPerCluster();
cluster.Run(range.begin(), range.end(), cluster.Run(range.begin(), range.end(),
[&](uint64_t task, size_t thread) { [&](uint64_t task, size_t thread) {

View File

@ -79,18 +79,15 @@ static void TunePool(hwy::PoolWaitMode wait_mode, hwy::ThreadPool& pool) {
} }
static void TunePools(hwy::PoolWaitMode wait_mode, NestedPools& pools) { static void TunePools(hwy::PoolWaitMode wait_mode, NestedPools& pools) {
TunePool(wait_mode, pools.AllPackages()); hwy::ThreadPool& clusters = pools.AllClusters();
for (size_t pkg_idx = 0; pkg_idx < pools.NumPackages(); ++pkg_idx) { TunePool(wait_mode, clusters);
hwy::ThreadPool& clusters = pools.AllClusters(pkg_idx);
TunePool(wait_mode, clusters);
// Run in parallel because Turin CPUs have 16, and in real usage, we often // Run in parallel because Turin CPUs have 16, and in real usage, we often
// run all at the same time. // run all at the same time.
clusters.Run(0, clusters.NumWorkers(), clusters.Run(0, clusters.NumWorkers(),
[&](uint64_t cluster_idx, size_t /*thread*/) { [&](uint64_t cluster_idx, size_t /*thread*/) {
TunePool(wait_mode, pools.Cluster(pkg_idx, cluster_idx)); TunePool(wait_mode, pools.Cluster(cluster_idx));
}); });
}
} }
ThreadingContext::ThreadingContext(const ThreadingArgs& args) ThreadingContext::ThreadingContext(const ThreadingArgs& args)

View File

@ -153,10 +153,7 @@ enum class ParallelismStrategy : uint8_t {
template <class Func> template <class Func>
void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks, void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks,
ThreadingContext& ctx, size_t cluster_idx, const Func& func) { ThreadingContext& ctx, size_t cluster_idx, const Func& func) {
HWY_DASSERT(ctx.topology.NumPackages() == 1); HWY_DASSERT(cluster_idx < ctx.topology.NumClusters());
const size_t pkg_idx = 0;
HWY_DASSERT(cluster_idx < ctx.topology.NumClusters(pkg_idx));
if (cluster_idx != 0) { if (cluster_idx != 0) {
// If already running across clusters, only use within-cluster modes. // If already running across clusters, only use within-cluster modes.
HWY_DASSERT(parallelism == ParallelismStrategy::kNone || HWY_DASSERT(parallelism == ParallelismStrategy::kNone ||
@ -173,7 +170,7 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks,
} }
case ParallelismStrategy::kAcrossClusters: case ParallelismStrategy::kAcrossClusters:
return ctx.pools.AllClusters(pkg_idx).Run( return ctx.pools.AllClusters().Run(
0, num_tasks, 0, num_tasks,
[&](uint64_t task, size_t cluster_idx) { func(task, cluster_idx); }); [&](uint64_t task, size_t cluster_idx) { func(task, cluster_idx); });
@ -181,7 +178,7 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks,
// Ensure the worker argument is unique across clusters, because it is // Ensure the worker argument is unique across clusters, because it is
// used for TLS indexing for example in profiler.h. // used for TLS indexing for example in profiler.h.
const size_t base = ctx.Worker(cluster_idx); const size_t base = ctx.Worker(cluster_idx);
return ctx.pools.Cluster(pkg_idx, cluster_idx) return ctx.pools.Cluster(cluster_idx)
.Run(0, num_tasks, [&](uint64_t task, size_t worker) { .Run(0, num_tasks, [&](uint64_t task, size_t worker) {
func(task, base + worker); func(task, base + worker);
}); });
@ -190,15 +187,15 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks,
case ParallelismStrategy::kFlat: { case ParallelismStrategy::kFlat: {
// Check for single cluster; if not, we must compute `cluster_base` for // Check for single cluster; if not, we must compute `cluster_base` for
// consistent and non-overlapping worker indices. // consistent and non-overlapping worker indices.
hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx); hwy::ThreadPool& all_clusters = ctx.pools.AllClusters();
const size_t num_clusters = all_clusters.NumWorkers(); const size_t num_clusters = all_clusters.NumWorkers();
if (num_clusters == 1) { if (num_clusters == 1) {
return ctx.pools.Cluster(pkg_idx, cluster_idx) return ctx.pools.Cluster(cluster_idx)
.Run(0, num_tasks, .Run(0, num_tasks,
[&](uint64_t task, size_t worker) { func(task, worker); }); [&](uint64_t task, size_t worker) { func(task, worker); });
} }
return ctx.pools.AllClusters(pkg_idx).Run( return ctx.pools.AllClusters().Run(
0, num_tasks, [&](uint64_t task, size_t cluster_idx) { 0, num_tasks, [&](uint64_t task, size_t cluster_idx) {
const size_t worker = ctx.Worker(cluster_idx); const size_t worker = ctx.Worker(cluster_idx);
func(task, worker); func(task, worker);

View File

@ -99,23 +99,16 @@ TEST(ThreadingTest, TestBoundedTopology) {
const BoundedSlice all; const BoundedSlice all;
const BoundedSlice one(0, 1); const BoundedSlice one(0, 1);
// All // All
{
BoundedTopology topology(all, all, all);
fprintf(stderr, "%s\n", topology.TopologyString());
}
// Max one package
{ {
BoundedTopology topology(one, all, all); BoundedTopology topology(one, all, all);
fprintf(stderr, "%s\n", topology.TopologyString()); fprintf(stderr, "%s\n", topology.TopologyString());
ASSERT_EQ(1, topology.NumPackages());
} }
// Max one cluster // Max one cluster
{ {
BoundedTopology topology(all, one, all); BoundedTopology topology(one, one, all);
fprintf(stderr, "%s\n", topology.TopologyString()); fprintf(stderr, "%s\n", topology.TopologyString());
ASSERT_EQ(1, topology.NumClusters(0)); ASSERT_EQ(1, topology.NumClusters());
} }
} }
@ -380,24 +373,32 @@ TEST(ThreadingTest, BenchJoin) {
ThreadingArgs threading_args; ThreadingArgs threading_args;
ThreadingContext ctx(threading_args); ThreadingContext ctx(threading_args);
NestedPools& pools = ctx.pools; NestedPools& pools = ctx.pools;
// Use last package because the main thread has been pinned to it.
const size_t pkg_idx = pools.NumPackages() - 1;
measure(pools.AllPackages(), false, "block packages"); if (pools.NumClusters() > 1) {
if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { measure(pools.AllClusters(), false, "block clusters");
measure(pools.AllClusters(pkg_idx), false, "block clusters");
} }
measure(pools.Cluster(pkg_idx, 0), false, "block in_cluster"); measure(pools.Cluster(0), false, "block in_cluster");
if (pools.AllPinned()) { if (pools.AllPinned()) {
const bool kSpin = true; const bool kSpin = true;
measure(pools.AllPackages(), kSpin, "spin packages"); if (pools.NumClusters() > 1) {
if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { measure(pools.AllClusters(), kSpin, "spin clusters");
measure(pools.AllClusters(pkg_idx), kSpin, "spin clusters");
} }
measure(pools.Cluster(pkg_idx, 0), kSpin, "spin in_cluster"); measure(pools.Cluster(0), kSpin, "spin in_cluster");
} }
} }
TEST(ThreadingTest, TestUnequalClusters) {
ThreadingArgs threading_args;
threading_args.max_lps = 13;
ThreadingContext ctx(threading_args);
const size_t last_workers =
ctx.pools.Cluster(ctx.topology.NumClusters() - 1).NumWorkers();
const size_t max_workers = ctx.pools.MaxWorkersPerCluster();
fprintf(stderr, "%zu clusters, last with %zu (max %zu)\n",
ctx.topology.NumClusters(), last_workers, max_workers);
HWY_ASSERT(last_workers <= max_workers);
}
} // namespace } // namespace
} // namespace gcpp } // namespace gcpp

View File

@ -18,21 +18,12 @@
#include <stdio.h> #include <stdio.h>
#include <algorithm> // std::sort #include <algorithm> // std::sort
#include <utility> // std::move
#include <vector> #include <vector>
#include "hwy/base.h" #include "hwy/base.h"
namespace gcpp { namespace gcpp {
// Sort T := packages/clusters by descending 'size' so that users who only use
// one Group get the largest.
template <class T>
static void SortByDescendingSize(std::vector<T>& groups) {
std::sort(groups.begin(), groups.end(),
[](const T& a, const T& b) { return a.Size() > b.Size(); });
}
// Returns set of LPs available for use. // Returns set of LPs available for use.
static LPS EnabledLPs(const BoundedSlice& lp_slice) { static LPS EnabledLPs(const BoundedSlice& lp_slice) {
LPS enabled_lps; LPS enabled_lps;
@ -88,21 +79,23 @@ BoundedTopology::BoundedTopology(BoundedSlice package_slice,
BoundedSlice cluster_slice, BoundedSlice cluster_slice,
BoundedSlice lp_slice) BoundedSlice lp_slice)
: package_slice_(package_slice), cluster_slice_(cluster_slice) { : package_slice_(package_slice), cluster_slice_(cluster_slice) {
HWY_ASSERT(package_slice_.Max() == 1);
const LPS enabled_lps = EnabledLPs(lp_slice); const LPS enabled_lps = EnabledLPs(lp_slice);
bool topology_ok = false;
#if !GEMMA_DISABLE_TOPOLOGY #if !GEMMA_DISABLE_TOPOLOGY
if (HWY_LIKELY(!topology_.packages.empty())) { if (HWY_LIKELY(!topology_.packages.empty())) {
InitFromTopology(enabled_lps); topology_ok = InitFromTopology(enabled_lps);
} }
#endif #endif
// Topology unknown or no packages with enabled LPs: create a single // Topology unknown or no packages with enabled LPs: create a single
// package with one cluster, and one node. // package with one cluster, and one node.
if (HWY_UNLIKELY(NumPackages() == 0)) { if (HWY_UNLIKELY(!topology_ok)) {
InitFromLPs(enabled_lps); InitFromLPs(enabled_lps);
} }
HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0); HWY_ASSERT(NumClusters() != 0 && NumNodes() != 0);
} }
// Topology is unknown, take the given set of LPs. // Topology is unknown, take the given set of LPs.
@ -161,9 +154,113 @@ constexpr bool kSplitLargeClusters = false;
constexpr size_t kMaxClusters = 8; constexpr size_t kMaxClusters = 8;
constexpr size_t kMaxLPsPerCluster = 6; constexpr size_t kMaxLPsPerCluster = 6;
// Topology is unknown, use only the given LPs which derive from OS affinity #if !GEMMA_DISABLE_TOPOLOGY
// and `lp_slice`.
BoundedTopology::Package::Package(const LPS& enabled_lps) { static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) {
LPS cores;
lps.Foreach([&](size_t lp) {
if (topology.lps[lp].smt == 0) cores.Set(lp);
});
return cores.Count();
}
// tcluster is a modifiable copy of the first cluster in the package.
void BoundedTopology::SplitLargeCluster(const LPS& enabled_lps,
hwy::Topology::Cluster tcluster) {
const LPS lps = clusters_[0].LPSet(); // copy so we can clear
clusters_.clear();
// Split `lps` into several clusters.
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster));
size_t num_lps = 0;
lps.Foreach(
[&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); });
HWY_DASSERT(num_lps == lps.Count());
// Create new clusters, just inserting the new LPS.
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
tcluster.lps = clusters_lps[cluster_idx];
// Keep same `private_kib` and `shared_kib`.
clusters_.push_back(Cluster(enabled_lps, topology_.lps, tcluster));
}
}
// Main part of ctor, called when topology is known.
bool BoundedTopology::InitFromTopology(const LPS& enabled_lps) {
const size_t tpkg_idx = package_slice_.Begin();
HWY_ASSERT(tpkg_idx < topology_.packages.size());
const hwy::Topology::Package& tpackage = topology_.packages[tpkg_idx];
const std::vector<hwy::Topology::Cluster>& tclusters = tpackage.clusters;
if (HWY_UNLIKELY(tclusters.empty())) {
HWY_WARN("Topology: no clusters found in package %zu.", tpkg_idx);
return false;
}
size_t max_tcluster_cores = 0;
size_t max_tcluster_lps = 0;
for (const hwy::Topology::Cluster& tcluster : tclusters) {
const size_t cores = CoresFromLPs(tcluster.lps, topology_);
const size_t lps = tcluster.lps.Count();
max_tcluster_cores = HWY_MAX(max_tcluster_cores, cores);
max_tcluster_lps = HWY_MAX(max_tcluster_lps, lps);
}
HWY_ASSERT(max_tcluster_cores != 0);
HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores);
// Populate `clusters` with the subset of clusters in `cluster_slice` that
// have any enabled LPs.
clusters_.reserve(cluster_slice_.Num(tclusters.size()));
cluster_slice_.Foreach("cluster", tclusters.size(), [&](size_t cluster_idx) {
const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx];
Cluster cluster(enabled_lps, topology_.lps, tcluster);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(cluster.NumWorkers() != 0)) {
clusters_.push_back(cluster);
// Remember NUMA nodes that we are actually using (not just enabled).
nodes_.Set(cluster.Node());
}
});
if (HWY_UNLIKELY(clusters_.empty())) {
HWY_WARN("Too restrictive cluster_slice or enabled_lps, no clusters left.");
return false;
}
if (kSplitLargeClusters && clusters_.size() == 1 &&
enabled_lps.Count() >= 16) {
SplitLargeCluster(enabled_lps, tpackage.clusters[0]);
}
// Sort by descending 'size' so that users who only use one get the largest.
std::sort(clusters_.begin(), clusters_.end(),
[](const Cluster& a, const Cluster& b) {
return a.NumWorkers() > b.NumWorkers();
});
// Largest number of enabled workers in any cluster, for `topology_string_`.
// This may be less than `max_tcluster_cores` if `enabled_lps` excludes some.
size_t max_cluster_workers = 0;
for (const Cluster& c : clusters_) {
max_cluster_workers = HWY_MAX(max_cluster_workers, c.NumWorkers());
}
HWY_ASSERT(max_cluster_workers <= max_tcluster_cores);
// Do not warn about large clusters: GNR has 40.
snprintf(topology_string_, sizeof(topology_string_),
"%zuS %zuX %zuC %zuH, using %zuX %zuC (nodes=%zu)",
topology_.packages.size(), tclusters.size(), max_tcluster_cores,
max_tcluster_lps / max_tcluster_cores, NumClusters(),
max_cluster_workers, nodes_.Count());
return true;
}
#endif // !GEMMA_DISABLE_TOPOLOGY
// Called when topology is unknown or `GEMMA_DISABLE_TOPOLOGY`. Uses only the
// given LPs which derive from OS affinity and `lp_slice`.
void BoundedTopology::InitFromLPs(const LPS& enabled_lps) {
LPS clusters_lps[kMaxClusters]; LPS clusters_lps[kMaxClusters];
const size_t num_clusters = const size_t num_clusters =
kSplitLargeClusters kSplitLargeClusters
@ -178,157 +275,11 @@ BoundedTopology::Package::Package(const LPS& enabled_lps) {
}); });
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
clusters.push_back(Cluster(clusters_lps[cluster_idx])); clusters_.push_back(Cluster(clusters_lps[cluster_idx]));
} }
}
// NOTE: caller is responsible for checking whether `clusters` is empty.
BoundedTopology::Package::Package(const LPS& enabled_lps,
const hwy::Topology& topology, size_t pkg_idx,
BoundedSlice cluster_slice) {
const hwy::Topology::Package& tpackage = topology.packages[pkg_idx];
// Populate `clusters` with the subset of clusters in `cluster_slice` that
// have any enabled LPs. If `clusters` remains empty, the caller will
// skip this `Package`.
clusters.reserve(cluster_slice.Num(tpackage.clusters.size()));
cluster_slice.Foreach(
"cluster", tpackage.clusters.size(), [&](size_t cluster_idx) {
const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx];
Cluster cluster(enabled_lps, topology.lps, tcluster);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(cluster.Size() != 0)) {
clusters.push_back(cluster);
}
});
SortByDescendingSize(clusters);
// If there is only one large cluster, split it into smaller ones.
if (kSplitLargeClusters && clusters.size() == 1 &&
enabled_lps.Count() >= 16) {
const LPS lps = clusters[0].LPSet(); // copy so we can clear
clusters.clear();
// Split `lps` into several clusters.
LPS clusters_lps[kMaxClusters];
const size_t num_clusters =
HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster));
size_t num_lps = 0;
lps.Foreach(
[&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); });
HWY_DASSERT(num_lps == lps.Count());
// Create new clusters, just inserting the new LPS.
hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy
for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) {
tcluster.lps = clusters_lps[cluster_idx];
// Keep same `private_kib` and `shared_kib`.
clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster));
}
}
}
#if !GEMMA_DISABLE_TOPOLOGY
static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) {
LPS cores;
lps.Foreach([&](size_t lp) {
if (topology.lps[lp].smt == 0) cores.Set(lp);
});
return cores.Count();
}
// Scans hwy::Topology for clusters and their size, for use by topology_string_.
static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters,
size_t& max_tcluster_cores,
size_t& max_tcluster_lps) {
max_tclusters = 0;
max_tcluster_cores = 0;
max_tcluster_lps = 0;
for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) {
const std::vector<hwy::Topology::Cluster>& tclusters =
topology_.packages[pkg_idx].clusters;
max_tclusters = HWY_MAX(max_tclusters, tclusters.size());
size_t tcluster_cores = 0;
size_t tcluster_lps = 0;
for (size_t cluster_idx = 0; cluster_idx < tclusters.size();
++cluster_idx) {
const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_);
const size_t lps = tclusters[cluster_idx].lps.Count();
tcluster_cores = HWY_MAX(tcluster_cores, cores);
tcluster_lps = HWY_MAX(tcluster_lps, lps);
}
if (tclusters.size() > 1 && tcluster_cores > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in hwy::Topology.",
pkg_idx, tcluster_cores);
}
max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores);
max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps);
}
HWY_ASSERT(max_tclusters != 0);
HWY_ASSERT(max_tcluster_cores != 0);
HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores);
}
// Main part of ctor, called when topology is known.
void BoundedTopology::InitFromTopology(const LPS& enabled_lps) {
size_t max_tclusters, max_tcluster_cores, max_tcluster_lps;
ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps);
// (Possibly empty) subset of `Topology` packages that have `enabled_lps`.
package_slice_.Foreach(
"package", topology_.packages.size(), [&](size_t pkg_idx) {
Package package(enabled_lps, topology_, pkg_idx, cluster_slice_);
// Skip if empty, i.e. too few `enabled_lps`.
if (HWY_LIKELY(!package.clusters.empty())) {
packages_.push_back(std::move(package));
}
});
if (NumPackages() == 0) return;
SortByDescendingSize(packages_);
// Remember NUMA nodes that we are actually using (not just enabled).
for (const Package& p : packages_) {
for (const Cluster& c : p.clusters) {
nodes_.Set(c.Node());
}
}
// Scan for max BoundedTopology clusters and their size, for topology_string_.
size_t all_max_cluster_size = 0;
for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) {
size_t max_cluster_size = 0;
for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx);
++cluster_idx) {
max_cluster_size =
HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size());
}
if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) {
HWY_WARN(
"Package %zu: multiple clusters with max size %zu, whereas CCX "
"only have 8, may indicate a bug in BoundedTopology.",
pkg_idx, max_cluster_size);
}
all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size);
}
snprintf(topology_string_, sizeof(topology_string_),
"%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)",
topology_.packages.size(), max_tclusters, max_tcluster_cores,
max_tcluster_lps / max_tcluster_cores, packages_.size(),
NumClusters(0), all_max_cluster_size, nodes_.Count());
}
#endif // !GEMMA_DISABLE_TOPOLOGY
void BoundedTopology::InitFromLPs(const LPS& enabled_lps) {
packages_.push_back(Package(enabled_lps));
snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu", snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu",
GetCluster(0, 0).Size()); GetCluster(0).NumWorkers());
// Assume a single NUMA node. // Assume a single NUMA node.
nodes_.Set(0); nodes_.Set(0);

View File

@ -40,6 +40,7 @@ class BoundedSlice {
BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {} BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {}
size_t Begin() const { return skip_; } size_t Begin() const { return skip_; }
size_t Max() const { return max_; }
// STL-style one past the end. // STL-style one past the end.
size_t End(size_t detected) const { size_t End(size_t detected) const {
@ -82,12 +83,11 @@ using LPS = hwy::LogicalProcessorSet;
// back to a single package and cluster. // back to a single package and cluster.
class BoundedTopology { class BoundedTopology {
public: public:
// Defaults to "use all detected". // `package_slice` must have `Max() == 1`. Others default to "use all".
BoundedTopology(BoundedSlice package_slice = BoundedSlice(), BoundedTopology(BoundedSlice package_slice,
BoundedSlice cluster_slice = BoundedSlice(), BoundedSlice cluster_slice = BoundedSlice(),
BoundedSlice lp_slice = BoundedSlice()); BoundedSlice lp_slice = BoundedSlice());
size_t NumPackages() const { return packages_.size(); }
size_t NumNodes() const { return nodes_.Count(); } size_t NumNodes() const { return nodes_.Count(); }
const char* TopologyString() const { return topology_string_; } const char* TopologyString() const { return topology_string_; }
@ -98,8 +98,7 @@ class BoundedTopology {
const std::vector<hwy::Topology::LP>& all_lps, const std::vector<hwy::Topology::LP>& all_lps,
const hwy::Topology::Cluster& tcluster); const hwy::Topology::Cluster& tcluster);
// For SortByDescendingSize. size_t NumWorkers() const { return num_workers_; }
size_t Size() const { return num_workers_; }
// Returns vector with all enabled LPs, used for pinning. // Returns vector with all enabled LPs, used for pinning.
std::vector<size_t> LPVector() const { std::vector<size_t> LPVector() const {
@ -127,26 +126,11 @@ class BoundedTopology {
size_t shared_kib_ = 0; size_t shared_kib_ = 0;
}; // Cluster }; // Cluster
size_t NumClusters(size_t pkg_idx) const { size_t NumClusters() const { return clusters_.size(); }
HWY_ASSERT(pkg_idx < NumPackages()); const Cluster& GetCluster(size_t cluster_idx) const {
return packages_[pkg_idx].clusters.size(); HWY_ASSERT(cluster_idx < clusters_.size());
return clusters_[cluster_idx];
} }
const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const {
HWY_ASSERT(pkg_idx < NumPackages());
const Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) {
HWY_ASSERT(pkg_idx < NumPackages());
Package& package = packages_[pkg_idx];
HWY_ASSERT(cluster_idx < package.clusters.size());
return package.clusters[cluster_idx];
}
#if !GEMMA_DISABLE_TOPOLOGY
const hwy::Topology& FullTopology() const { return topology_; }
#endif
// In case we are running with a subset of packages/clusters, these are added // In case we are running with a subset of packages/clusters, these are added
// to the package/cluster indices for purposes of the thread name, so that // to the package/cluster indices for purposes of the thread name, so that
@ -155,26 +139,17 @@ class BoundedTopology {
size_t SkippedClusters() const { return cluster_slice_.Begin(); } size_t SkippedClusters() const { return cluster_slice_.Begin(); }
private: private:
struct Package { void SplitLargeCluster(const LPS& enabled_lps,
explicit Package(const LPS& enabled_lps); hwy::Topology::Cluster tcluster);
Package(const LPS& enabled_lps, const hwy::Topology& topology, bool InitFromTopology(const LPS& enabled_lps);
size_t pkg_idx, BoundedSlice cluster_slice);
// For SortByDescendingSize.
size_t Size() const { return clusters.size(); }
std::vector<Cluster> clusters;
}; // Package
void InitFromTopology(const LPS& enabled_lps);
void InitFromLPs(const LPS& enabled_lps); void InitFromLPs(const LPS& enabled_lps);
#if !GEMMA_DISABLE_TOPOLOGY #if !GEMMA_DISABLE_TOPOLOGY
hwy::Topology topology_; hwy::Topology topology_;
#endif #endif
BoundedSlice package_slice_; BoundedSlice package_slice_; // Within the entire detected topology.
BoundedSlice cluster_slice_; BoundedSlice cluster_slice_;
std::vector<Package> packages_; std::vector<Cluster> clusters_;
char topology_string_[96]; char topology_string_[96];
LPS nodes_; LPS nodes_;
}; };