diff --git a/io/blob_compare.cc b/io/blob_compare.cc index bb25843..998036e 100644 --- a/io/blob_compare.cc +++ b/io/blob_compare.cc @@ -28,7 +28,6 @@ #include "util/threading_context.h" #include "hwy/aligned_allocator.h" // Span #include "hwy/base.h" -#include "hwy/contrib/thread_pool/thread_pool.h" #include "hwy/timer.h" namespace gcpp { @@ -104,27 +103,31 @@ BlobVec ReserveMemory(const RangeVec& ranges, BytePtr& all_blobs, size_t& pos) { // Reads one set of blobs in parallel (helpful if in disk cache). // Aborts on error. void ReadBlobs(BlobReader& reader, const RangeVec& ranges, BlobVec& blobs, - hwy::ThreadPool& pool) { + ThreadingContext& ctx, size_t cluster_idx) { HWY_ASSERT(reader.Keys().size() == blobs.size()); HWY_ASSERT(ranges.size() == blobs.size()); - pool.Run(0, blobs.size(), [&](size_t i, size_t /*thread*/) { - HWY_ASSERT(ranges[i].bytes == blobs[i].size()); - reader.file().Read(ranges[i].offset, ranges[i].bytes, blobs[i].data()); - }); + ParallelFor(ParallelismStrategy::kWithinCluster, blobs.size(), ctx, + cluster_idx, [&](size_t i, size_t /*thread*/) { + HWY_ASSERT(ranges[i].bytes == blobs[i].size()); + reader.file().Read(ranges[i].offset, ranges[i].bytes, + blobs[i].data()); + }); } // Parallelizes ReadBlobs across (two) packages, if available. void ReadBothBlobs(BlobReader& reader1, BlobReader& reader2, const RangeVec& ranges1, const RangeVec& ranges2, size_t total_bytes, BlobVec& blobs1, BlobVec& blobs2, - NestedPools& pools) { + ThreadingContext& ctx) { const double t0 = hwy::platform::Now(); - HWY_WARN("Reading %zu GiB, %zux%zu cores: ", total_bytes >> 30, - pools.AllPackages().NumWorkers(), pools.Pool().NumWorkers()); - pools.AllPackages().Run(0, 2, [&](size_t task, size_t pkg_idx) { - ReadBlobs(task ? reader2 : reader1, task ? ranges2 : ranges1, - task ? blobs2 : blobs1, pools.Pool(pkg_idx)); - }); + HWY_WARN("Reading %zu GiB, %zu clusters: ", total_bytes >> 30, + ctx.pools.NumClusters()); + ParallelFor(ParallelismStrategy::kAcrossClusters, 2, ctx, 0, + [&](const size_t task, size_t cluster_idx) { + ReadBlobs(task ? reader1 : reader2, task ? ranges1 : ranges2, + task ? blobs1 : blobs2, ctx, cluster_idx); + }); + const double t1 = hwy::platform::Now(); HWY_WARN("%.1f GB/s\n", total_bytes / (t1 - t0) * 1E-9); } @@ -181,29 +184,23 @@ size_t BlobDifferences(const ByteSpan data1, const ByteSpan data2, } void CompareBlobs(const KeyVec& keys, BlobVec& blobs1, BlobVec& blobs2, - size_t total_bytes, NestedPools& pools) { + size_t total_bytes, ThreadingContext& ctx) { HWY_WARN("Comparing %zu blobs in parallel: ", keys.size()); const double t0 = hwy::platform::Now(); std::atomic blobs_equal{}; std::atomic blobs_diff{}; - const IndexRangePartition ranges = StaticPartition( - IndexRange(0, keys.size()), pools.AllPackages().NumWorkers(), 1); - ParallelizeOneRange( - ranges, pools.AllPackages(), - [&](const IndexRange& range, size_t pkg_idx) { - pools.Pool(pkg_idx).Run( - range.begin(), range.end(), [&](size_t i, size_t /*thread*/) { - const size_t mismatches = - BlobDifferences(blobs1[i], blobs2[i], keys[i]); - if (mismatches != 0) { - HWY_WARN("key %s has %zu mismatches in %zu bytes!\n", - keys[i].c_str(), mismatches, blobs1[i].size()); - blobs_diff.fetch_add(1); - } else { - blobs_equal.fetch_add(1); - } - }); - }); + ParallelFor(ParallelismStrategy::kHierarchical, keys.size(), ctx, 0, + [&](size_t i, size_t /*thread*/) { + const size_t mismatches = + BlobDifferences(blobs1[i], blobs2[i], keys[i]); + if (mismatches != 0) { + HWY_WARN("key %s has %zu mismatches in %zu bytes!\n", + keys[i].c_str(), mismatches, blobs1[i].size()); + blobs_diff.fetch_add(1); + } else { + blobs_equal.fetch_add(1); + } + }); const double t1 = hwy::platform::Now(); HWY_WARN("%.1f GB/s; total blob matches=%zu, mismatches=%zu\n", total_bytes / (t1 - t0) * 1E-9, blobs_equal.load(), @@ -230,9 +227,9 @@ void ReadAndCompareBlobs(const Path& path1, const Path& path2) { ThreadingArgs args; ThreadingContext ctx(args); ReadBothBlobs(reader1, reader2, ranges1, ranges2, total_bytes, blobs1, blobs2, - ctx.pools); + ctx); - CompareBlobs(reader1.Keys(), blobs1, blobs2, total_bytes, ctx.pools); + CompareBlobs(reader1.Keys(), blobs1, blobs2, total_bytes, ctx); } } // namespace gcpp diff --git a/ops/dot_test.cc b/ops/dot_test.cc index d93b210..ed09429 100644 --- a/ops/dot_test.cc +++ b/ops/dot_test.cc @@ -1124,8 +1124,9 @@ void TestAllDot() { MatPadding::kOdd); std::array all_stats; - ctx.pools.Cluster(0, 0).Run( - 0, kReps, [&](const uint32_t rep, size_t thread) { + ParallelFor( + ParallelismStrategy::kWithinCluster, kReps, ctx, 0, + [&](size_t rep, size_t thread) { float* HWY_RESTRICT pa = a.Row(thread); float* HWY_RESTRICT pb = b.Row(thread); double* HWY_RESTRICT buf = bufs.Row(thread); diff --git a/ops/matmul.cc b/ops/matmul.cc index 6ef1412..ebeff9b 100644 --- a/ops/matmul.cc +++ b/ops/matmul.cc @@ -351,7 +351,7 @@ std::vector MMCandidates(const CacheInfo& cache, size_t M, size_t K, MatMulEnv::MatMulEnv(ThreadingContext& ctx) : ctx(ctx), A_BF(ctx.allocator), C_tiles(ctx) { - const size_t num_clusters = ctx.pools.AllClusters(/*pkg_idx=*/0).NumWorkers(); + const size_t num_clusters = ctx.pools.NumClusters(); per_cluster.resize(num_clusters); for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { row_ptrs.push_back(hwy::AllocateAligned(kMaxBatchSize)); // C @@ -368,7 +368,7 @@ void BindB(ThreadingContext& ctx, MatPtr& B, size_t sizeof_TC) { PROFILER_ZONE("Startup.BindB"); - const size_t node = ctx.topology.GetCluster(/*pkg_idx=*/0, 0).Node(); + const size_t node = ctx.topology.GetCluster(0).Node(); uintptr_t begin = reinterpret_cast(B.RowBytes(0)); uintptr_t end = begin + B.Rows() * B.Stride() * B.ElementBytes(); // B row padding is less than the page size, so only bind the subset that @@ -394,7 +394,7 @@ void BindC(ThreadingContext& ctx, MatPtr& C) { const size_t end = hwy::RoundDownTo(cols_c.end() * C.ElementBytes(), allocator.BasePageBytes()); - const size_t node = ctx.topology.GetCluster(/*pkg_idx=*/0, 0).Node(); + const size_t node = ctx.topology.GetCluster(0).Node(); bool ok = true; for (size_t im = 0; im < C.Rows(); ++im) { ok &= allocator.BindMemory(C.RowBytes(im) + begin, end - begin, node); diff --git a/ops/matmul.h b/ops/matmul.h index bedee3d..e16c0f2 100644 --- a/ops/matmul.h +++ b/ops/matmul.h @@ -105,8 +105,7 @@ struct MMParallelWithinCluster { size_t inner_tasks, size_t cluster_idx, const Func& func) const { HWY_DASSERT(1 <= inner_tasks && inner_tasks <= 4); - const size_t pkg_idx = 0; - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); const size_t base = ctx.Worker(cluster_idx); const IndexRangePartition ranges_n = StaticPartition( @@ -122,8 +121,7 @@ struct MMParallelWithinCluster { const IndexRangePartition& ranges_mc, const IndexRangePartition& ranges_nc, size_t cluster_idx, const Func& func) const { - const size_t pkg_idx = 0; - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); const size_t base = ctx.Worker(cluster_idx); // Low-batch: avoid Divide/Remainder. @@ -143,8 +141,7 @@ struct MMParallelWithinCluster { template void ForRangeMC(ThreadingContext& ctx, const IndexRange& range_mc, size_t cluster_idx, const Func& func) const { - const size_t pkg_idx = 0; - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); const size_t base = ctx.Worker(cluster_idx); cluster.Run( @@ -164,12 +161,11 @@ struct MMParallelHierarchical { HWY_DASSERT(caller_cluster_idx == 0); // Single cluster: parallel-for over static partition of `range_n`. - const size_t pkg_idx = 0; - hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx); + hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(); const size_t num_clusters = all_clusters.NumWorkers(); if (num_clusters == 1) { const size_t cluster_idx = 0; - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); const IndexRangePartition ranges_n = StaticPartition( range_n, cluster.NumWorkers() * inner_tasks, n_multiple); return ParallelizeOneRange( @@ -185,7 +181,7 @@ struct MMParallelHierarchical { ParallelizeOneRange( ranges_n, all_clusters, [&](const IndexRange& n_range, const size_t cluster_idx) { - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); const size_t cluster_base = ctx.Worker(cluster_idx); // Parallel-for over sub-ranges of `cluster_range` within the cluster. const IndexRangePartition worker_ranges = StaticPartition( @@ -206,17 +202,16 @@ struct MMParallelHierarchical { const IndexRangePartition& ranges_nc, HWY_MAYBE_UNUSED size_t caller_cluster_idx, const Func& func) const { - const size_t pkg_idx = 0; HWY_DASSERT(caller_cluster_idx == 0); - hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx); + hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(); // `all_clusters` is a pool with one worker per cluster in a package. const size_t num_clusters = all_clusters.NumWorkers(); // Single (big) cluster: collapse two range indices into one parallel-for // to reduce the number of fork-joins. if (num_clusters == 1) { const size_t cluster_idx = 0; - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); // Low-batch: avoid Divide/Remainder. if (HWY_UNLIKELY(ranges_mc.NumTasks() == 1)) { return ParallelizeOneRange( @@ -237,7 +232,7 @@ struct MMParallelHierarchical { ranges_nc, all_clusters, [&](const IndexRange range_nc, size_t cluster_idx) { const size_t cluster_base = ctx.Worker(cluster_idx); - hwy::ThreadPool& cluster = ctx.pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = ctx.pools.Cluster(cluster_idx); ParallelizeOneRange(ranges_mc, cluster, [&](const IndexRange& range_mc, size_t worker) { func(range_mc, range_nc, cluster_base + worker); diff --git a/ops/matmul_test.cc b/ops/matmul_test.cc index 2f0fde2..101707f 100644 --- a/ops/matmul_test.cc +++ b/ops/matmul_test.cc @@ -191,29 +191,22 @@ HWY_INLINE void MatMulSlow(const MatPtrT A, const MatPtrT B, const IndexRange all_cols_c(0, C.Cols()); NestedPools& pools = env.ctx.pools; - hwy::ThreadPool& all_packages = pools.AllPackages(); - const IndexRangePartition get_row_c = - StaticPartition(all_rows_c, all_packages.NumWorkers(), 1); + hwy::ThreadPool& all_clusters = pools.AllClusters(); + const size_t multiple = env.ctx.allocator.QuantumBytes() / sizeof(TB); + const IndexRangePartition get_col_c = + StaticPartition(all_cols_c, all_clusters.NumWorkers(), multiple); ParallelizeOneRange( - get_row_c, all_packages, - [&](const IndexRange& rows_c, size_t package_idx) HWY_ATTR { - hwy::ThreadPool& all_clusters = pools.AllClusters(package_idx); - const size_t multiple = env.ctx.allocator.QuantumBytes() / sizeof(TB); - const IndexRangePartition get_col_c = - StaticPartition(all_cols_c, all_clusters.NumWorkers(), multiple); - ParallelizeOneRange( - get_col_c, all_clusters, - [&](const IndexRange& cols_c, size_t cluster_idx) HWY_ATTR { - for (size_t r : rows_c) { - TC* HWY_RESTRICT C_row = C.Row(r); - for (size_t c : cols_c) { - const float add = add_row ? add_row[c] : 0.0f; - const float dot = - Dot(df, b_span, c * B.Stride(), A.Row(r), A.Cols()); - C_row[c] = hwy::ConvertScalarTo(add + scale * dot); - } - } - }); + get_col_c, all_clusters, + [&](const IndexRange& cols_c, size_t cluster_idx) HWY_ATTR { + for (size_t r : all_rows_c) { + TC* HWY_RESTRICT C_row = C.Row(r); + for (size_t c : cols_c) { + const float add = add_row ? add_row[c] : 0.0f; + const float dot = + Dot(df, b_span, c * B.Stride(), A.Row(r), A.Cols()); + C_row[c] = hwy::ConvertScalarTo(add + scale * dot); + } + } }); } diff --git a/util/allocator.cc b/util/allocator.cc index f99586e..612bbb9 100644 --- a/util/allocator.cc +++ b/util/allocator.cc @@ -139,7 +139,7 @@ CacheInfo::CacheInfo(const BoundedTopology& topology) { step_bytes_ = HWY_MAX(line_bytes_, vector_bytes_); - const BoundedTopology::Cluster& cluster = topology.GetCluster(0, 0); + const BoundedTopology::Cluster& cluster = topology.GetCluster(0); if (const hwy::Cache* caches = hwy::DataCaches()) { l1_bytes_ = caches[1].size_kib << 10; l2_bytes_ = caches[2].size_kib << 10; diff --git a/util/allocator.h b/util/allocator.h index 086b6e9..d508d5c 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -169,7 +169,7 @@ class Allocator { bool ShouldBind() const { return should_bind_; } // Attempts to move(!) `[p, p + bytes)` to the given NUMA node, which is - // typically `BoundedTopology::GetCluster(package_idx, cluster_idx).node`. + // typically `BoundedTopology::GetCluster(cluster_idx).node`. // Writes zeros to SOME of the memory. Only call if `ShouldBind()`. // `p` and `bytes` must be multiples of `QuantumBytes()`. bool BindMemory(void* p, size_t bytes, size_t node) const; diff --git a/util/threading.cc b/util/threading.cc index 9c4cfe0..6d4a603 100644 --- a/util/threading.cc +++ b/util/threading.cc @@ -18,7 +18,6 @@ #include -#include // std::sort #include #include #include @@ -31,14 +30,6 @@ namespace gcpp { -// Sort T := packages/clusters by descending 'size' so that users who only use -// one Group get the largest. -template -static void SortByDescendingSize(std::vector& groups) { - std::sort(groups.begin(), groups.end(), - [](const T& a, const T& b) { return a.Size() > b.Size(); }); -} - static bool InContainer() { return false; // placeholder for container detection, do not remove } @@ -55,19 +46,18 @@ PinningPolicy::PinningPolicy(Tristate pin) { // If `pinning.Want()`, tries to pin each worker in `pool` to an LP in // `cluster`, and calls `pinning.NotifyFailed()` if any fails. -void MaybePin(const BoundedTopology& topology, size_t pkg_idx, - size_t cluster_idx, const BoundedTopology::Cluster& cluster, - PinningPolicy& pinning, hwy::ThreadPool& pool) { +static void MaybePin(const BoundedTopology& topology, size_t cluster_idx, + const BoundedTopology::Cluster& cluster, + PinningPolicy& pinning, hwy::ThreadPool& pool) { const std::vector lps = cluster.LPVector(); HWY_ASSERT(pool.NumWorkers() <= lps.size()); pool.Run(0, pool.NumWorkers(), [&](uint64_t task, size_t thread) { HWY_ASSERT(task == thread); // each worker has one task char buf[16]; // Linux limitation - const int bytes_written = snprintf(buf, sizeof(buf), "P%zu X%02zu C%03d", - topology.SkippedPackages() + pkg_idx, - topology.SkippedClusters() + cluster_idx, - static_cast(task)); + const int bytes_written = snprintf( + buf, sizeof(buf), "P%zu X%02zu C%03d", topology.SkippedPackages(), + topology.SkippedClusters() + cluster_idx, static_cast(task)); HWY_ASSERT(bytes_written < static_cast(sizeof(buf))); hwy::SetThreadName(buf, 0); // does not support varargs @@ -113,79 +103,56 @@ static size_t DivideMaxAcross(const size_t max, const size_t instances) { return max; } -NestedPools::NestedPools(const BoundedTopology& topology, - const Allocator& allocator, size_t max_threads, - Tristate pin) - : pinning_(pin) { - packages_.resize(topology.NumPackages()); - all_packages_ = - MakePool(allocator, packages_.size(), hwy::PoolWorkerMapping()); - const size_t max_workers_per_package = - DivideMaxAcross(max_threads, packages_.size()); - // Each worker in all_packages_, including the main thread, will be the - // calling thread of an all_clusters->Run, and hence pinned to one of the - // `cluster.lps` if `pin`. - all_packages_->Run(0, packages_.size(), [&](uint64_t pkg_idx, size_t thread) { - HWY_ASSERT(pkg_idx == thread); // each thread has one task - packages_[pkg_idx] = Package(topology, allocator, pinning_, pkg_idx, - max_workers_per_package); - }); - - all_pinned_ = pinning_.AllPinned(&pin_string_); - - // For mapping package/cluster/thread to noncontiguous TLS indices, in case - // cluster/thread counts differ. - HWY_ASSERT(!packages_.empty() && packages_.size() <= 16); - for (const Package& p : packages_) { - max_clusters_per_package_ = - HWY_MAX(max_clusters_per_package_, p.NumClusters()); - max_workers_per_cluster_ = - HWY_MAX(max_workers_per_cluster_, p.MaxWorkersPerCluster()); - } - HWY_ASSERT(max_clusters_per_package_ >= 1); - HWY_ASSERT(max_clusters_per_package_ <= 64); - HWY_ASSERT(max_workers_per_cluster_ >= 1); - HWY_ASSERT(max_workers_per_cluster_ <= 256); -} - // `max_or_zero` == 0 means no limit. static inline size_t CapIfNonZero(size_t num, size_t max_or_zero) { return (max_or_zero == 0) ? num : HWY_MIN(num, max_or_zero); } -NestedPools::Package::Package(const BoundedTopology& topology, - const Allocator& allocator, - PinningPolicy& pinning, size_t pkg_idx, - size_t max_workers_per_package) { - // Pre-allocate because elements are set concurrently. - clusters_.resize(topology.NumClusters(pkg_idx)); - const size_t max_workers_per_cluster = - DivideMaxAcross(max_workers_per_package, clusters_.size()); +NestedPools::NestedPools(const BoundedTopology& topology, + const Allocator& allocator, size_t max_threads, + Tristate pin) + : pinning_(pin) { + const size_t num_clusters = topology.NumClusters(); + const size_t cluster_workers_cap = DivideMaxAcross(max_threads, num_clusters); + + // Precompute cluster sizes to ensure we pass the same values to `MakePool`. + // The max is also used for `all_clusters_mapping`, see below. + size_t workers_per_cluster[hwy::kMaxClusters] = {}; + size_t all_clusters_node = 0; + for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { + const BoundedTopology::Cluster& tcluster = topology.GetCluster(cluster_idx); + workers_per_cluster[cluster_idx] = + CapIfNonZero(tcluster.NumWorkers(), cluster_workers_cap); + // Cluster sizes can vary because individual LPs may be disabled. Use the + // max so that `GlobalIdx` is consistent within and across clusters. It is + // OK to have holes or gaps in the worker index space. + max_workers_per_cluster_ = + HWY_MAX(max_workers_per_cluster_, workers_per_cluster[cluster_idx]); + all_clusters_node = tcluster.Node(); // arbitrarily use the last node seen + } - const BoundedTopology::Cluster& cluster0 = topology.GetCluster(pkg_idx, 0); - // Core 0 of each cluster. The second argument is the cluster size, not - // number of clusters. We ensure that it is the same for all clusters so that - // the `GlobalIdx` computation is consistent within and across clusters. const hwy::PoolWorkerMapping all_clusters_mapping(hwy::kAllClusters, - cluster0.Size()); - all_clusters_ = MakePool(allocator, clusters_.size(), all_clusters_mapping, - cluster0.Node()); + max_workers_per_cluster_); + all_clusters_ = MakePool(allocator, num_clusters, all_clusters_mapping, + all_clusters_node); + + // Pre-allocate because elements are set concurrently. + clusters_.resize(num_clusters); + // Parallel so we also pin the calling worker in `all_clusters` to // `cluster.lps`. - all_clusters_->Run( - 0, all_clusters_->NumWorkers(), [&](size_t cluster_idx, size_t thread) { - HWY_ASSERT(cluster_idx == thread); // each thread has one task - const BoundedTopology::Cluster& cluster = - topology.GetCluster(pkg_idx, cluster_idx); - HWY_ASSERT(cluster.Size() == cluster0.Size()); - clusters_[cluster_idx] = MakePool( - allocator, CapIfNonZero(cluster.Size(), max_workers_per_cluster), - hwy::PoolWorkerMapping(cluster_idx, cluster.Size()), - cluster.Node()); - // Pin workers AND the calling thread from `all_clusters`. - MaybePin(topology, pkg_idx, cluster_idx, cluster, pinning, - *clusters_[cluster_idx]); - }); + all_clusters_->Run(0, num_clusters, [&](size_t cluster_idx, size_t thread) { + HWY_ASSERT(cluster_idx == thread); // each thread has one task + const BoundedTopology::Cluster& tcluster = topology.GetCluster(cluster_idx); + clusters_[cluster_idx] = + MakePool(allocator, workers_per_cluster[cluster_idx], + hwy::PoolWorkerMapping(cluster_idx, max_workers_per_cluster_), + tcluster.Node()); + // Pin workers AND the calling thread from `all_clusters_`. + MaybePin(topology, cluster_idx, tcluster, pinning_, + *clusters_[cluster_idx]); + }); + all_pinned_ = pinning_.AllPinned(&pin_string_); } } // namespace gcpp diff --git a/util/threading.h b/util/threading.h index 53795be..35c6e22 100644 --- a/util/threading.h +++ b/util/threading.h @@ -66,17 +66,14 @@ class PinningPolicy { }; // PinningPolicy // Creates a hierarchy of thread pools according to `BoundedTopology`: one with -// a thread per enabled package; for each of those, one with a thread per -// enabled cluster (CCX/shared L3), and for each of those, the remaining -// enabled cores in that cluster. +// a thread per enabled cluster (CCX/shared L3), and for each of those, the +// remaining enabled cores in that cluster. // // Note that we support spin waits, thus it is important for each thread to be // responsive, hence we do not create more than one thread per enabled core. -// For example, when there are two packages with four clusters of 8 cores, -// `AllPackages` has the main thread plus one extra thread, each `AllClusters` -// has one of the `AllPackages` threads plus three extras, each `Cluster` runs -// on one `AllClusters` thread plus seven extra workers, for a total of -// 1 + 2*3 + 2*(4*7) = 63 extras plus the main thread. +// For example, when there are four clusters of 8 cores, `AllClusters` has the +// main thread plus three extras, each `Cluster` runs on one of `AllClusters` +// plus seven extras, for a total of 3 + (4*7) = 31 extras plus the main thread. // // Useful when there are tasks which should be parallelized by workers sharing a // cache, or on the same NUMA node. In both cases, individual pools have lower @@ -96,6 +93,10 @@ class NestedPools { NestedPools(NestedPools&&) = delete; NestedPools& operator=(NestedPools&&) = delete; + // Because cross-package latency is high, this interface assumes only one + // package is used. The `skip_packages` argument to `BoundedTopology` selects + // which package that is for this `NestedPools` instance. + // // `max_threads` is the maximum number of threads to divide among all // clusters. This is more intuitive than a per-cluster limit for users who // may not be aware of the CPU topology. This should be zero (meaning no @@ -104,8 +105,8 @@ class NestedPools { // // To ensure we do not create more threads than there are HW cores, which // would cause huge slowdowns when spinning, the `BoundedSlice` arguments - // only impose upper bounds on the number of detected packages and clusters - // rather than defining the actual number of threads. + // only impose upper bounds on the number of detected clusters rather than + // defining the actual number of threads. NestedPools(const BoundedTopology& topology, const Allocator& allocator, size_t max_threads = 0, Tristate pin = Tristate::kDefault); @@ -133,98 +134,37 @@ class NestedPools { } } - size_t NumPackages() const { return packages_.size(); } - hwy::ThreadPool& AllPackages() { return *all_packages_; } - hwy::ThreadPool& AllClusters(size_t pkg_idx) { - HWY_DASSERT(pkg_idx < NumPackages()); - return packages_[pkg_idx].AllClusters(); - } - hwy::ThreadPool& Cluster(size_t pkg_idx, size_t cluster_idx) { - HWY_DASSERT(pkg_idx < NumPackages()); - return packages_[pkg_idx].Cluster(cluster_idx); + size_t NumClusters() const { return clusters_.size(); } + hwy::ThreadPool& AllClusters() { return *all_clusters_; } + hwy::ThreadPool& Cluster(size_t cluster_idx) { + HWY_DASSERT(cluster_idx < clusters_.size()); + return *clusters_[cluster_idx]; } // Reasonably tight upper bounds for allocating thread-local storage (TLS). size_t MaxWorkersPerCluster() const { return max_workers_per_cluster_; } - size_t MaxWorkersPerPackage() const { - return max_clusters_per_package_ * MaxWorkersPerCluster(); - } - size_t MaxWorkers() const { return NumPackages() * MaxWorkersPerPackage(); } - - // Actual number of workers. - size_t TotalWorkers() const { - size_t total_workers = 0; - for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) { - total_workers += packages_[pkg_idx].TotalWorkers(); - } - return total_workers; - } + size_t MaxWorkers() const { return NumClusters() * MaxWorkersPerCluster(); } // For ShowConfig const char* PinString() const { return pin_string_; } // Returns a single pool on the given package: either one thread per cluster // if there is more than one, which maximizes available memory bandwidth, or - // the first cluster, which is typically the whole package. For use by callers - // that only have a single parallel-for. + // the first cluster, which is typically the whole package. For use by + // callers that only have a single parallel-for. + // DEPRECATED: use ParallelFor instead. hwy::ThreadPool& Pool(size_t pkg_idx = 0) { // Only one cluster: use its pool, typically a whole socket. - if (AllClusters(pkg_idx).NumWorkers() == 1) { - return Cluster(pkg_idx, 0); - } + if (NumClusters() == 1) return Cluster(0); // One worker per cluster to maximize bandwidth availability. - return AllClusters(pkg_idx); + return AllClusters(); } private: - class Package { - public: - Package() = default; // for vector - Package(const BoundedTopology& topology, const Allocator& allocator, - PinningPolicy& pinning, size_t pkg_idx, - size_t max_workers_per_package); - - size_t NumClusters() const { return clusters_.size(); } - size_t MaxWorkersPerCluster() const { - size_t max_workers_per_cluster = 0; - for (const PoolPtr& cluster : clusters_) { - max_workers_per_cluster = - HWY_MAX(max_workers_per_cluster, cluster->NumWorkers()); - } - return max_workers_per_cluster; - } - size_t TotalWorkers() const { - size_t total_workers = 0; - for (const PoolPtr& cluster : clusters_) { - total_workers += cluster->NumWorkers(); - } - return total_workers; - } - - hwy::ThreadPool& AllClusters() { return *all_clusters_; } - hwy::ThreadPool& Cluster(size_t cluster_idx) { - HWY_DASSERT(cluster_idx < clusters_.size()); - return *clusters_[cluster_idx]; - } - - void SetWaitMode(hwy::PoolWaitMode wait_mode) { - all_clusters_->SetWaitMode(wait_mode); - for (PoolPtr& cluster : clusters_) { - cluster->SetWaitMode(wait_mode); - } - } - - private: - // Must be freed after `clusters_` because it reserves threads which are - // the main threads of `clusters_`. - PoolPtr all_clusters_; - std::vector clusters_; - }; // Package - void SetWaitMode(hwy::PoolWaitMode wait_mode) { - all_packages_->SetWaitMode(wait_mode); - for (Package& package : packages_) { - package.SetWaitMode(wait_mode); + all_clusters_->SetWaitMode(wait_mode); + for (PoolPtr& cluster : clusters_) { + cluster->SetWaitMode(wait_mode); } } @@ -232,12 +172,13 @@ class NestedPools { bool all_pinned_; const char* pin_string_; - std::vector packages_; - PoolPtr all_packages_; + // Must be freed after `clusters_` because it reserves threads which are + // the main threads of `clusters_`. + PoolPtr all_clusters_; + std::vector clusters_; - // For TLS indices. One might think this belongs in BoundedTopology, but it - // depends on max_threads, which is passed to the NestedPools constructor. - size_t max_clusters_per_package_ = 0; + // Used by `PoolWorkerMapping`. This depends on the `max_threads` argument, + // hence we can only compute this here, not in `BoundedTopology`. size_t max_workers_per_cluster_ = 0; }; @@ -362,14 +303,11 @@ void ParallelizeTwoRanges(const IndexRangePartition& get1, template void HierarchicalParallelFor(size_t num_tasks, NestedPools& pools, const Func& func) { - // Even if there are multiple packages, we only use the first. - const size_t pkg_idx = 0; - // If few tasks, run on a single cluster. Also avoids a bit of overhead if // there is only one cluster. - hwy::ThreadPool& all_clusters = pools.AllClusters(pkg_idx); + hwy::ThreadPool& all_clusters = pools.AllClusters(); const size_t num_clusters = all_clusters.NumWorkers(); - hwy::ThreadPool& cluster = pools.Cluster(pkg_idx, 0); + hwy::ThreadPool& cluster = pools.Cluster(0); if (num_clusters == 1 || num_tasks <= cluster.NumWorkers()) { return cluster.Run(0, num_tasks, [&](uint64_t task, size_t thread) { func(task, thread); @@ -382,7 +320,7 @@ void HierarchicalParallelFor(size_t num_tasks, NestedPools& pools, ParallelizeOneRange( ranges, all_clusters, [&](const IndexRange& range, const size_t cluster_idx) { - hwy::ThreadPool& cluster = pools.Cluster(pkg_idx, cluster_idx); + hwy::ThreadPool& cluster = pools.Cluster(cluster_idx); const size_t cluster_base = cluster_idx * pools.MaxWorkersPerCluster(); cluster.Run(range.begin(), range.end(), [&](uint64_t task, size_t thread) { diff --git a/util/threading_context.cc b/util/threading_context.cc index e2c4d03..0a349fc 100644 --- a/util/threading_context.cc +++ b/util/threading_context.cc @@ -79,18 +79,15 @@ static void TunePool(hwy::PoolWaitMode wait_mode, hwy::ThreadPool& pool) { } static void TunePools(hwy::PoolWaitMode wait_mode, NestedPools& pools) { - TunePool(wait_mode, pools.AllPackages()); - for (size_t pkg_idx = 0; pkg_idx < pools.NumPackages(); ++pkg_idx) { - hwy::ThreadPool& clusters = pools.AllClusters(pkg_idx); - TunePool(wait_mode, clusters); + hwy::ThreadPool& clusters = pools.AllClusters(); + TunePool(wait_mode, clusters); - // Run in parallel because Turin CPUs have 16, and in real usage, we often - // run all at the same time. - clusters.Run(0, clusters.NumWorkers(), - [&](uint64_t cluster_idx, size_t /*thread*/) { - TunePool(wait_mode, pools.Cluster(pkg_idx, cluster_idx)); - }); - } + // Run in parallel because Turin CPUs have 16, and in real usage, we often + // run all at the same time. + clusters.Run(0, clusters.NumWorkers(), + [&](uint64_t cluster_idx, size_t /*thread*/) { + TunePool(wait_mode, pools.Cluster(cluster_idx)); + }); } ThreadingContext::ThreadingContext(const ThreadingArgs& args) diff --git a/util/threading_context.h b/util/threading_context.h index ff4ff62..5c55fc4 100644 --- a/util/threading_context.h +++ b/util/threading_context.h @@ -153,10 +153,7 @@ enum class ParallelismStrategy : uint8_t { template void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks, ThreadingContext& ctx, size_t cluster_idx, const Func& func) { - HWY_DASSERT(ctx.topology.NumPackages() == 1); - const size_t pkg_idx = 0; - - HWY_DASSERT(cluster_idx < ctx.topology.NumClusters(pkg_idx)); + HWY_DASSERT(cluster_idx < ctx.topology.NumClusters()); if (cluster_idx != 0) { // If already running across clusters, only use within-cluster modes. HWY_DASSERT(parallelism == ParallelismStrategy::kNone || @@ -173,7 +170,7 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks, } case ParallelismStrategy::kAcrossClusters: - return ctx.pools.AllClusters(pkg_idx).Run( + return ctx.pools.AllClusters().Run( 0, num_tasks, [&](uint64_t task, size_t cluster_idx) { func(task, cluster_idx); }); @@ -181,7 +178,7 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks, // Ensure the worker argument is unique across clusters, because it is // used for TLS indexing for example in profiler.h. const size_t base = ctx.Worker(cluster_idx); - return ctx.pools.Cluster(pkg_idx, cluster_idx) + return ctx.pools.Cluster(cluster_idx) .Run(0, num_tasks, [&](uint64_t task, size_t worker) { func(task, base + worker); }); @@ -190,15 +187,15 @@ void ParallelFor(ParallelismStrategy parallelism, size_t num_tasks, case ParallelismStrategy::kFlat: { // Check for single cluster; if not, we must compute `cluster_base` for // consistent and non-overlapping worker indices. - hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(pkg_idx); + hwy::ThreadPool& all_clusters = ctx.pools.AllClusters(); const size_t num_clusters = all_clusters.NumWorkers(); if (num_clusters == 1) { - return ctx.pools.Cluster(pkg_idx, cluster_idx) + return ctx.pools.Cluster(cluster_idx) .Run(0, num_tasks, [&](uint64_t task, size_t worker) { func(task, worker); }); } - return ctx.pools.AllClusters(pkg_idx).Run( + return ctx.pools.AllClusters().Run( 0, num_tasks, [&](uint64_t task, size_t cluster_idx) { const size_t worker = ctx.Worker(cluster_idx); func(task, worker); diff --git a/util/threading_test.cc b/util/threading_test.cc index ac2746b..4cd8554 100644 --- a/util/threading_test.cc +++ b/util/threading_test.cc @@ -99,23 +99,16 @@ TEST(ThreadingTest, TestBoundedTopology) { const BoundedSlice all; const BoundedSlice one(0, 1); // All - { - BoundedTopology topology(all, all, all); - fprintf(stderr, "%s\n", topology.TopologyString()); - } - - // Max one package { BoundedTopology topology(one, all, all); fprintf(stderr, "%s\n", topology.TopologyString()); - ASSERT_EQ(1, topology.NumPackages()); } // Max one cluster { - BoundedTopology topology(all, one, all); + BoundedTopology topology(one, one, all); fprintf(stderr, "%s\n", topology.TopologyString()); - ASSERT_EQ(1, topology.NumClusters(0)); + ASSERT_EQ(1, topology.NumClusters()); } } @@ -380,24 +373,32 @@ TEST(ThreadingTest, BenchJoin) { ThreadingArgs threading_args; ThreadingContext ctx(threading_args); NestedPools& pools = ctx.pools; - // Use last package because the main thread has been pinned to it. - const size_t pkg_idx = pools.NumPackages() - 1; - measure(pools.AllPackages(), false, "block packages"); - if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { - measure(pools.AllClusters(pkg_idx), false, "block clusters"); + if (pools.NumClusters() > 1) { + measure(pools.AllClusters(), false, "block clusters"); } - measure(pools.Cluster(pkg_idx, 0), false, "block in_cluster"); + measure(pools.Cluster(0), false, "block in_cluster"); if (pools.AllPinned()) { const bool kSpin = true; - measure(pools.AllPackages(), kSpin, "spin packages"); - if (pools.AllClusters(pkg_idx).NumWorkers() > 1) { - measure(pools.AllClusters(pkg_idx), kSpin, "spin clusters"); + if (pools.NumClusters() > 1) { + measure(pools.AllClusters(), kSpin, "spin clusters"); } - measure(pools.Cluster(pkg_idx, 0), kSpin, "spin in_cluster"); + measure(pools.Cluster(0), kSpin, "spin in_cluster"); } } +TEST(ThreadingTest, TestUnequalClusters) { + ThreadingArgs threading_args; + threading_args.max_lps = 13; + ThreadingContext ctx(threading_args); + const size_t last_workers = + ctx.pools.Cluster(ctx.topology.NumClusters() - 1).NumWorkers(); + const size_t max_workers = ctx.pools.MaxWorkersPerCluster(); + fprintf(stderr, "%zu clusters, last with %zu (max %zu)\n", + ctx.topology.NumClusters(), last_workers, max_workers); + HWY_ASSERT(last_workers <= max_workers); +} + } // namespace } // namespace gcpp diff --git a/util/topology.cc b/util/topology.cc index 0d32f22..f20b7f9 100644 --- a/util/topology.cc +++ b/util/topology.cc @@ -18,21 +18,12 @@ #include #include // std::sort -#include // std::move #include #include "hwy/base.h" namespace gcpp { -// Sort T := packages/clusters by descending 'size' so that users who only use -// one Group get the largest. -template -static void SortByDescendingSize(std::vector& groups) { - std::sort(groups.begin(), groups.end(), - [](const T& a, const T& b) { return a.Size() > b.Size(); }); -} - // Returns set of LPs available for use. static LPS EnabledLPs(const BoundedSlice& lp_slice) { LPS enabled_lps; @@ -88,21 +79,23 @@ BoundedTopology::BoundedTopology(BoundedSlice package_slice, BoundedSlice cluster_slice, BoundedSlice lp_slice) : package_slice_(package_slice), cluster_slice_(cluster_slice) { + HWY_ASSERT(package_slice_.Max() == 1); const LPS enabled_lps = EnabledLPs(lp_slice); + bool topology_ok = false; #if !GEMMA_DISABLE_TOPOLOGY if (HWY_LIKELY(!topology_.packages.empty())) { - InitFromTopology(enabled_lps); + topology_ok = InitFromTopology(enabled_lps); } #endif // Topology unknown or no packages with enabled LPs: create a single // package with one cluster, and one node. - if (HWY_UNLIKELY(NumPackages() == 0)) { + if (HWY_UNLIKELY(!topology_ok)) { InitFromLPs(enabled_lps); } - HWY_ASSERT(NumPackages() != 0 && NumClusters(0) != 0 && NumNodes() != 0); + HWY_ASSERT(NumClusters() != 0 && NumNodes() != 0); } // Topology is unknown, take the given set of LPs. @@ -161,9 +154,113 @@ constexpr bool kSplitLargeClusters = false; constexpr size_t kMaxClusters = 8; constexpr size_t kMaxLPsPerCluster = 6; -// Topology is unknown, use only the given LPs which derive from OS affinity -// and `lp_slice`. -BoundedTopology::Package::Package(const LPS& enabled_lps) { +#if !GEMMA_DISABLE_TOPOLOGY + +static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) { + LPS cores; + lps.Foreach([&](size_t lp) { + if (topology.lps[lp].smt == 0) cores.Set(lp); + }); + return cores.Count(); +} + +// tcluster is a modifiable copy of the first cluster in the package. +void BoundedTopology::SplitLargeCluster(const LPS& enabled_lps, + hwy::Topology::Cluster tcluster) { + const LPS lps = clusters_[0].LPSet(); // copy so we can clear + clusters_.clear(); + + // Split `lps` into several clusters. + LPS clusters_lps[kMaxClusters]; + const size_t num_clusters = + HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster)); + size_t num_lps = 0; + lps.Foreach( + [&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); }); + HWY_DASSERT(num_lps == lps.Count()); + + // Create new clusters, just inserting the new LPS. + for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { + tcluster.lps = clusters_lps[cluster_idx]; + // Keep same `private_kib` and `shared_kib`. + clusters_.push_back(Cluster(enabled_lps, topology_.lps, tcluster)); + } +} + +// Main part of ctor, called when topology is known. +bool BoundedTopology::InitFromTopology(const LPS& enabled_lps) { + const size_t tpkg_idx = package_slice_.Begin(); + HWY_ASSERT(tpkg_idx < topology_.packages.size()); + const hwy::Topology::Package& tpackage = topology_.packages[tpkg_idx]; + const std::vector& tclusters = tpackage.clusters; + if (HWY_UNLIKELY(tclusters.empty())) { + HWY_WARN("Topology: no clusters found in package %zu.", tpkg_idx); + return false; + } + + size_t max_tcluster_cores = 0; + size_t max_tcluster_lps = 0; + for (const hwy::Topology::Cluster& tcluster : tclusters) { + const size_t cores = CoresFromLPs(tcluster.lps, topology_); + const size_t lps = tcluster.lps.Count(); + max_tcluster_cores = HWY_MAX(max_tcluster_cores, cores); + max_tcluster_lps = HWY_MAX(max_tcluster_lps, lps); + } + HWY_ASSERT(max_tcluster_cores != 0); + HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores); + + // Populate `clusters` with the subset of clusters in `cluster_slice` that + // have any enabled LPs. + clusters_.reserve(cluster_slice_.Num(tclusters.size())); + cluster_slice_.Foreach("cluster", tclusters.size(), [&](size_t cluster_idx) { + const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx]; + Cluster cluster(enabled_lps, topology_.lps, tcluster); + + // Skip if empty, i.e. too few `enabled_lps`. + if (HWY_LIKELY(cluster.NumWorkers() != 0)) { + clusters_.push_back(cluster); + // Remember NUMA nodes that we are actually using (not just enabled). + nodes_.Set(cluster.Node()); + } + }); + if (HWY_UNLIKELY(clusters_.empty())) { + HWY_WARN("Too restrictive cluster_slice or enabled_lps, no clusters left."); + return false; + } + + if (kSplitLargeClusters && clusters_.size() == 1 && + enabled_lps.Count() >= 16) { + SplitLargeCluster(enabled_lps, tpackage.clusters[0]); + } + + // Sort by descending 'size' so that users who only use one get the largest. + std::sort(clusters_.begin(), clusters_.end(), + [](const Cluster& a, const Cluster& b) { + return a.NumWorkers() > b.NumWorkers(); + }); + + // Largest number of enabled workers in any cluster, for `topology_string_`. + // This may be less than `max_tcluster_cores` if `enabled_lps` excludes some. + size_t max_cluster_workers = 0; + for (const Cluster& c : clusters_) { + max_cluster_workers = HWY_MAX(max_cluster_workers, c.NumWorkers()); + } + HWY_ASSERT(max_cluster_workers <= max_tcluster_cores); + // Do not warn about large clusters: GNR has 40. + + snprintf(topology_string_, sizeof(topology_string_), + "%zuS %zuX %zuC %zuH, using %zuX %zuC (nodes=%zu)", + topology_.packages.size(), tclusters.size(), max_tcluster_cores, + max_tcluster_lps / max_tcluster_cores, NumClusters(), + max_cluster_workers, nodes_.Count()); + return true; +} + +#endif // !GEMMA_DISABLE_TOPOLOGY + +// Called when topology is unknown or `GEMMA_DISABLE_TOPOLOGY`. Uses only the +// given LPs which derive from OS affinity and `lp_slice`. +void BoundedTopology::InitFromLPs(const LPS& enabled_lps) { LPS clusters_lps[kMaxClusters]; const size_t num_clusters = kSplitLargeClusters @@ -178,157 +275,11 @@ BoundedTopology::Package::Package(const LPS& enabled_lps) { }); for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { - clusters.push_back(Cluster(clusters_lps[cluster_idx])); + clusters_.push_back(Cluster(clusters_lps[cluster_idx])); } -} - -// NOTE: caller is responsible for checking whether `clusters` is empty. -BoundedTopology::Package::Package(const LPS& enabled_lps, - const hwy::Topology& topology, size_t pkg_idx, - BoundedSlice cluster_slice) { - const hwy::Topology::Package& tpackage = topology.packages[pkg_idx]; - // Populate `clusters` with the subset of clusters in `cluster_slice` that - // have any enabled LPs. If `clusters` remains empty, the caller will - // skip this `Package`. - clusters.reserve(cluster_slice.Num(tpackage.clusters.size())); - cluster_slice.Foreach( - "cluster", tpackage.clusters.size(), [&](size_t cluster_idx) { - const hwy::Topology::Cluster& tcluster = tpackage.clusters[cluster_idx]; - Cluster cluster(enabled_lps, topology.lps, tcluster); - - // Skip if empty, i.e. too few `enabled_lps`. - if (HWY_LIKELY(cluster.Size() != 0)) { - clusters.push_back(cluster); - } - }); - SortByDescendingSize(clusters); - - // If there is only one large cluster, split it into smaller ones. - if (kSplitLargeClusters && clusters.size() == 1 && - enabled_lps.Count() >= 16) { - const LPS lps = clusters[0].LPSet(); // copy so we can clear - clusters.clear(); - - // Split `lps` into several clusters. - LPS clusters_lps[kMaxClusters]; - const size_t num_clusters = - HWY_MIN(kMaxClusters, hwy::DivCeil(lps.Count(), kMaxLPsPerCluster)); - size_t num_lps = 0; - lps.Foreach( - [&](size_t lp) { clusters_lps[num_lps++ % num_clusters].Set(lp); }); - HWY_DASSERT(num_lps == lps.Count()); - - // Create new clusters, just inserting the new LPS. - hwy::Topology::Cluster tcluster = tpackage.clusters[0]; // modifiable copy - for (size_t cluster_idx = 0; cluster_idx < num_clusters; ++cluster_idx) { - tcluster.lps = clusters_lps[cluster_idx]; - // Keep same `private_kib` and `shared_kib`. - clusters.push_back(Cluster(enabled_lps, topology.lps, tcluster)); - } - } -} - -#if !GEMMA_DISABLE_TOPOLOGY - -static size_t CoresFromLPs(const LPS& lps, const hwy::Topology& topology) { - LPS cores; - lps.Foreach([&](size_t lp) { - if (topology.lps[lp].smt == 0) cores.Set(lp); - }); - return cores.Count(); -} - -// Scans hwy::Topology for clusters and their size, for use by topology_string_. -static void ScanTClusters(hwy::Topology& topology_, size_t& max_tclusters, - size_t& max_tcluster_cores, - size_t& max_tcluster_lps) { - max_tclusters = 0; - max_tcluster_cores = 0; - max_tcluster_lps = 0; - for (size_t pkg_idx = 0; pkg_idx < topology_.packages.size(); ++pkg_idx) { - const std::vector& tclusters = - topology_.packages[pkg_idx].clusters; - max_tclusters = HWY_MAX(max_tclusters, tclusters.size()); - size_t tcluster_cores = 0; - size_t tcluster_lps = 0; - for (size_t cluster_idx = 0; cluster_idx < tclusters.size(); - ++cluster_idx) { - const size_t cores = CoresFromLPs(tclusters[cluster_idx].lps, topology_); - const size_t lps = tclusters[cluster_idx].lps.Count(); - tcluster_cores = HWY_MAX(tcluster_cores, cores); - tcluster_lps = HWY_MAX(tcluster_lps, lps); - } - - if (tclusters.size() > 1 && tcluster_cores > 8) { - HWY_WARN( - "Package %zu: multiple clusters with max size %zu, whereas CCX " - "only have 8, may indicate a bug in hwy::Topology.", - pkg_idx, tcluster_cores); - } - max_tcluster_cores = HWY_MAX(max_tcluster_cores, tcluster_cores); - max_tcluster_lps = HWY_MAX(max_tcluster_lps, tcluster_lps); - } - HWY_ASSERT(max_tclusters != 0); - HWY_ASSERT(max_tcluster_cores != 0); - HWY_ASSERT(max_tcluster_lps >= max_tcluster_cores); -} - -// Main part of ctor, called when topology is known. -void BoundedTopology::InitFromTopology(const LPS& enabled_lps) { - size_t max_tclusters, max_tcluster_cores, max_tcluster_lps; - ScanTClusters(topology_, max_tclusters, max_tcluster_cores, max_tcluster_lps); - - // (Possibly empty) subset of `Topology` packages that have `enabled_lps`. - package_slice_.Foreach( - "package", topology_.packages.size(), [&](size_t pkg_idx) { - Package package(enabled_lps, topology_, pkg_idx, cluster_slice_); - // Skip if empty, i.e. too few `enabled_lps`. - if (HWY_LIKELY(!package.clusters.empty())) { - packages_.push_back(std::move(package)); - } - }); - if (NumPackages() == 0) return; - SortByDescendingSize(packages_); - - // Remember NUMA nodes that we are actually using (not just enabled). - for (const Package& p : packages_) { - for (const Cluster& c : p.clusters) { - nodes_.Set(c.Node()); - } - } - - // Scan for max BoundedTopology clusters and their size, for topology_string_. - size_t all_max_cluster_size = 0; - for (size_t pkg_idx = 0; pkg_idx < NumPackages(); ++pkg_idx) { - size_t max_cluster_size = 0; - for (size_t cluster_idx = 0; cluster_idx < NumClusters(pkg_idx); - ++cluster_idx) { - max_cluster_size = - HWY_MAX(max_cluster_size, GetCluster(pkg_idx, cluster_idx).Size()); - } - if (NumClusters(pkg_idx) > 1 && max_cluster_size > 8) { - HWY_WARN( - "Package %zu: multiple clusters with max size %zu, whereas CCX " - "only have 8, may indicate a bug in BoundedTopology.", - pkg_idx, max_cluster_size); - } - all_max_cluster_size = HWY_MAX(all_max_cluster_size, max_cluster_size); - } - - snprintf(topology_string_, sizeof(topology_string_), - "%zuS %zuX %zuC %zuH, using %zuS %zuX %zuC (nodes=%zu)", - topology_.packages.size(), max_tclusters, max_tcluster_cores, - max_tcluster_lps / max_tcluster_cores, packages_.size(), - NumClusters(0), all_max_cluster_size, nodes_.Count()); -} - -#endif // !GEMMA_DISABLE_TOPOLOGY - -void BoundedTopology::InitFromLPs(const LPS& enabled_lps) { - packages_.push_back(Package(enabled_lps)); snprintf(topology_string_, sizeof(topology_string_), "LPs=%zu", - GetCluster(0, 0).Size()); + GetCluster(0).NumWorkers()); // Assume a single NUMA node. nodes_.Set(0); diff --git a/util/topology.h b/util/topology.h index b844bd9..d4f80cc 100644 --- a/util/topology.h +++ b/util/topology.h @@ -40,6 +40,7 @@ class BoundedSlice { BoundedSlice(size_t skip = 0, size_t max = 0) : skip_(skip), max_(max) {} size_t Begin() const { return skip_; } + size_t Max() const { return max_; } // STL-style one past the end. size_t End(size_t detected) const { @@ -82,12 +83,11 @@ using LPS = hwy::LogicalProcessorSet; // back to a single package and cluster. class BoundedTopology { public: - // Defaults to "use all detected". - BoundedTopology(BoundedSlice package_slice = BoundedSlice(), + // `package_slice` must have `Max() == 1`. Others default to "use all". + BoundedTopology(BoundedSlice package_slice, BoundedSlice cluster_slice = BoundedSlice(), BoundedSlice lp_slice = BoundedSlice()); - size_t NumPackages() const { return packages_.size(); } size_t NumNodes() const { return nodes_.Count(); } const char* TopologyString() const { return topology_string_; } @@ -98,8 +98,7 @@ class BoundedTopology { const std::vector& all_lps, const hwy::Topology::Cluster& tcluster); - // For SortByDescendingSize. - size_t Size() const { return num_workers_; } + size_t NumWorkers() const { return num_workers_; } // Returns vector with all enabled LPs, used for pinning. std::vector LPVector() const { @@ -127,26 +126,11 @@ class BoundedTopology { size_t shared_kib_ = 0; }; // Cluster - size_t NumClusters(size_t pkg_idx) const { - HWY_ASSERT(pkg_idx < NumPackages()); - return packages_[pkg_idx].clusters.size(); + size_t NumClusters() const { return clusters_.size(); } + const Cluster& GetCluster(size_t cluster_idx) const { + HWY_ASSERT(cluster_idx < clusters_.size()); + return clusters_[cluster_idx]; } - const Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) const { - HWY_ASSERT(pkg_idx < NumPackages()); - const Package& package = packages_[pkg_idx]; - HWY_ASSERT(cluster_idx < package.clusters.size()); - return package.clusters[cluster_idx]; - } - Cluster& GetCluster(size_t pkg_idx, size_t cluster_idx) { - HWY_ASSERT(pkg_idx < NumPackages()); - Package& package = packages_[pkg_idx]; - HWY_ASSERT(cluster_idx < package.clusters.size()); - return package.clusters[cluster_idx]; - } - -#if !GEMMA_DISABLE_TOPOLOGY - const hwy::Topology& FullTopology() const { return topology_; } -#endif // In case we are running with a subset of packages/clusters, these are added // to the package/cluster indices for purposes of the thread name, so that @@ -155,26 +139,17 @@ class BoundedTopology { size_t SkippedClusters() const { return cluster_slice_.Begin(); } private: - struct Package { - explicit Package(const LPS& enabled_lps); - Package(const LPS& enabled_lps, const hwy::Topology& topology, - size_t pkg_idx, BoundedSlice cluster_slice); - - // For SortByDescendingSize. - size_t Size() const { return clusters.size(); } - - std::vector clusters; - }; // Package - - void InitFromTopology(const LPS& enabled_lps); + void SplitLargeCluster(const LPS& enabled_lps, + hwy::Topology::Cluster tcluster); + bool InitFromTopology(const LPS& enabled_lps); void InitFromLPs(const LPS& enabled_lps); #if !GEMMA_DISABLE_TOPOLOGY hwy::Topology topology_; #endif - BoundedSlice package_slice_; + BoundedSlice package_slice_; // Within the entire detected topology. BoundedSlice cluster_slice_; - std::vector packages_; + std::vector clusters_; char topology_string_[96]; LPS nodes_; };