From 74faaaf7d2b90757aa666b41e7f2db5a38c7fa9e Mon Sep 17 00:00:00 2001 From: Marko Tombak Date: Tue, 3 Mar 2026 11:31:14 +0200 Subject: [PATCH] parallel load: multi-context model loading --- common/arg.cpp | 15 ++++++++++ common/common.h | 2 ++ src/llama-model-loader.cpp | 47 +++++++++++++++++++++++-------- src/llama-model-loader.h | 7 ++++- src/llama-model.cpp | 57 ++++++++++++++++++++++++++++++++++++-- 5 files changed, 114 insertions(+), 14 deletions(-) diff --git a/common/arg.cpp b/common/arg.cpp index c6a2dcbf2d..4a7d36e537 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -2364,6 +2364,21 @@ common_params_context common_params_parser_init(common_params & params, llama_ex } } ).set_env("LLAMA_ARG_SPLIT_MODE")); + add_opt(common_arg( + {"-pl", "--parallel-load"}, "N", + string_format("max parallel jobs for model loading (default: %d, -1 = no cap (up to #contexts), 1 = sequential)", params.n_parallel_load), + [](common_params & params, int value) { + params.n_parallel_load = value; + if (params.n_parallel_load <= 0) { + params.n_parallel_load = -1; // unlimited + } +#ifdef _WIN32 + _putenv_s("LLAMA_ARG_PARALLEL_LOAD", std::to_string(params.n_parallel_load).c_str()); +#else + setenv("LLAMA_ARG_PARALLEL_LOAD", std::to_string(params.n_parallel_load).c_str(), 1); +#endif + } + ).set_env("LLAMA_ARG_PARALLEL_LOAD")); add_opt(common_arg( {"-ts", "--tensor-split"}, "N0,N1,N2,...", "fraction of the model to offload to each GPU, comma-separated list of proportions, e.g. 3,1", diff --git a/common/common.h b/common/common.h index 62201ea1ad..471f60789e 100644 --- a/common/common.h +++ b/common/common.h @@ -444,6 +444,8 @@ struct common_params { enum llama_split_mode split_mode = LLAMA_SPLIT_MODE_LAYER; // how to split the model across GPUs + int32_t n_parallel_load = 4; // number of threads for parallel model loading (-1 = unlimited, 1 = sequential) + struct cpu_params cpuparams; struct cpu_params cpuparams_batch; diff --git a/src/llama-model-loader.cpp b/src/llama-model-loader.cpp index 413f34c226..6f1060e4fd 100644 --- a/src/llama-model-loader.cpp +++ b/src/llama-model-loader.cpp @@ -12,6 +12,7 @@ #include #include #include +#include static const size_t kiB = 1024; static const size_t MiB = 1024*kiB; @@ -549,6 +550,7 @@ llama_model_loader::llama_model_loader( llm_kv = LLM_KV(llm_arch_from_string(arch_name)); files.emplace_back(new llama_file(fname.c_str(), "rb", use_direct_io)); + file_paths.emplace_back(fname); contexts.emplace_back(ctx); if (use_mmap && use_direct_io) { @@ -631,6 +633,7 @@ llama_model_loader::llama_model_loader( } files.emplace_back(new llama_file(fname_split, "rb", use_direct_io)); + file_paths.emplace_back(fname_split); contexts.emplace_back(ctx); // Save tensors data offset info of the shard. @@ -1373,6 +1376,21 @@ bool llama_model_loader::load_all_data( std::vector> read_buf; std::vector>> validation_result; + // Thread-local file handles to avoid seek/read races when loading multiple + // contexts in parallel. + thread_local std::unordered_map> local_files; + auto get_local_file = [this](size_t idx) -> llama_file * { + const std::string key = file_paths.at(idx) + (use_direct_io ? "#dio1" : "#dio0"); + auto it = local_files.find(key); + if (it == local_files.end()) { + auto local_file = std::make_unique(file_paths.at(idx).c_str(), "rb", use_direct_io); + auto * ptr = local_file.get(); + local_files.emplace(key, std::move(local_file)); + return ptr; + } + return it->second.get(); + }; + // 4 staging buffers for async uploads, each sized 1MB seems to be a good default for single NVMe drives. // NVMe raid configurations might require more / larger buffers. constexpr size_t n_buffers = 4; @@ -1479,7 +1497,7 @@ bool llama_model_loader::load_all_data( } if (progress_callback) { - if (!progress_callback((float) size_done / size_data, progress_callback_user_data)) { + if (!progress_callback((float) size_done.load(std::memory_order_relaxed) / size_data, progress_callback_user_data)) { return false; } } @@ -1503,19 +1521,21 @@ bool llama_model_loader::load_all_data( GGML_ASSERT(buf_mmap || cur->data); // either we have a buffer to allocate the tensor in, or it is already allocated if (buf_mmap && cur->data == nullptr) { ggml_backend_tensor_alloc(buf_mmap, cur, data); - if (lmlocks) { - const auto & lmlock = lmlocks->at(weight->idx); - lmlock->grow_to(weight->offs + n_size); + { + std::lock_guard lock(mmaps_used_mutex); + if (lmlocks) { + const auto & lmlock = lmlocks->at(weight->idx); + lmlock->grow_to(weight->offs + n_size); + } + auto & mmap_used = mmaps_used[weight->idx]; + mmap_used.first = std::min(mmap_used.first, weight->offs); + mmap_used.second = std::max(mmap_used.second, weight->offs + n_size); } - - auto & mmap_used = mmaps_used[weight->idx]; - mmap_used.first = std::min(mmap_used.first, weight->offs); - mmap_used.second = std::max(mmap_used.second, weight->offs + n_size); } else { ggml_backend_tensor_set(cur, data, 0, n_size); } } else { - const auto & file = files.at(weight->idx); + llama_file * file = get_local_file(weight->idx); if (ggml_backend_buffer_is_host(cur->buffer)) { file->seek(weight->offs, SEEK_SET); @@ -1591,7 +1611,7 @@ bool llama_model_loader::load_all_data( } } - size_done += n_size; + size_done.fetch_add(n_size, std::memory_order_relaxed); } // free temporary resources used for async uploads @@ -1618,7 +1638,12 @@ bool llama_model_loader::load_all_data( } // check if this is the last call and do final cleanup - if (size_done >= size_data) { + if (size_done.load(std::memory_order_relaxed) >= size_data) { + bool expected = false; + if (!final_cleanup_done.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + return true; + } + // unmap offloaded tensors and metadata if (use_mmap) { for (uint32_t idx = 0; idx < mappings.size(); idx++) { diff --git a/src/llama-model-loader.h b/src/llama-model-loader.h index ed5de729ca..4cc3a3d732 100644 --- a/src/llama-model-loader.h +++ b/src/llama-model-loader.h @@ -9,9 +9,11 @@ #include "ggml-cpp.h" +#include #include #include #include +#include #include #include @@ -81,6 +83,7 @@ struct llama_model_loader { bool no_alloc; llama_files files; + std::vector file_paths; llama_ftype ftype; llama_fver fver; @@ -99,9 +102,11 @@ struct llama_model_loader { std::string arch_name; LLM_KV llm_kv = LLM_KV(LLM_ARCH_UNKNOWN); - size_t size_done = 0; + std::atomic size_done = 0; size_t size_data = 0; std::vector> mmaps_used; + std::mutex mmaps_used_mutex; + std::atomic final_cleanup_done = false; // define a comparator for the buft -> ctx map to ensure that the order is well-defined: struct ggml_backend_buft_comparator { diff --git a/src/llama-model.cpp b/src/llama-model.cpp index f8caad2889..2297fd75f2 100644 --- a/src/llama-model.cpp +++ b/src/llama-model.cpp @@ -17,12 +17,15 @@ #include "models/models.h" #include +#include #include +#include #include #include #include #include #include +#include #include #include #include @@ -7654,10 +7657,60 @@ bool llama_model::load_tensors(llama_model_loader & ml) { } // load tensor data - for (auto & [ctx, buf_map] : ctx_buf_maps) { - if (!ml.load_all_data(ctx, buf_map, use_mlock ? &pimpl->mlock_mmaps : NULL, params.progress_callback, params.progress_callback_user_data)) { + const char * limit_env = getenv("LLAMA_ARG_PARALLEL_LOAD"); + const size_t default_limit = 4; + const int limit_val = limit_env ? atoi(limit_env) : (int) default_limit; + const size_t n_contexts = ctx_buf_maps.size(); + const size_t parallel_limit = limit_val <= 0 ? n_contexts : (size_t) limit_val; + const bool use_parallel = n_contexts > 1 && parallel_limit > 1; + + if (use_parallel) { + LLAMA_LOG_INFO("%s: using parallel loading for %zu GPU contexts (limit=%zu)\n", __func__, n_contexts, parallel_limit); + + std::atomic load_failed{false}; + std::vector> futures; + futures.reserve(n_contexts); + + for (auto & [ctx, buf_map] : ctx_buf_maps) { + if (futures.size() >= parallel_limit) { + if (!futures.front().get()) { + load_failed.store(true, std::memory_order_relaxed); + } + futures.erase(futures.begin()); + } + + auto * ctx_ptr = ctx; + auto * buf_map_ptr = &buf_map; + auto * mlock_ptr = use_mlock ? &pimpl->mlock_mmaps : nullptr; + + futures.emplace_back(std::async(std::launch::async, [&ml, ctx_ptr, buf_map_ptr, mlock_ptr, &load_failed]() { + if (load_failed.load(std::memory_order_relaxed)) { + return false; + } + return ml.load_all_data( + ctx_ptr, + *buf_map_ptr, + mlock_ptr, + nullptr, + nullptr); + })); + } + + for (auto & future : futures) { + if (!future.get()) { + load_failed.store(true, std::memory_order_relaxed); + } + } + + if (load_failed.load(std::memory_order_relaxed)) { return false; } + } else { + for (auto & [ctx, buf_map] : ctx_buf_maps) { + if (!ml.load_all_data(ctx, buf_map, use_mlock ? &pimpl->mlock_mmaps : NULL, params.progress_callback, params.progress_callback_user_data)) { + return false; + } + } } if (use_mmap_buffer) {