parallel load: multi-context model loading

This commit is contained in:
Marko Tombak 2026-03-03 11:31:14 +02:00
parent bd6992180b
commit 74faaaf7d2
5 changed files with 114 additions and 14 deletions

View File

@ -2364,6 +2364,21 @@ common_params_context common_params_parser_init(common_params & params, llama_ex
}
}
).set_env("LLAMA_ARG_SPLIT_MODE"));
add_opt(common_arg(
{"-pl", "--parallel-load"}, "N",
string_format("max parallel jobs for model loading (default: %d, -1 = no cap (up to #contexts), 1 = sequential)", params.n_parallel_load),
[](common_params & params, int value) {
params.n_parallel_load = value;
if (params.n_parallel_load <= 0) {
params.n_parallel_load = -1; // unlimited
}
#ifdef _WIN32
_putenv_s("LLAMA_ARG_PARALLEL_LOAD", std::to_string(params.n_parallel_load).c_str());
#else
setenv("LLAMA_ARG_PARALLEL_LOAD", std::to_string(params.n_parallel_load).c_str(), 1);
#endif
}
).set_env("LLAMA_ARG_PARALLEL_LOAD"));
add_opt(common_arg(
{"-ts", "--tensor-split"}, "N0,N1,N2,...",
"fraction of the model to offload to each GPU, comma-separated list of proportions, e.g. 3,1",

View File

@ -444,6 +444,8 @@ struct common_params {
enum llama_split_mode split_mode = LLAMA_SPLIT_MODE_LAYER; // how to split the model across GPUs
int32_t n_parallel_load = 4; // number of threads for parallel model loading (-1 = unlimited, 1 = sequential)
struct cpu_params cpuparams;
struct cpu_params cpuparams_batch;

View File

@ -12,6 +12,7 @@
#include <cstring>
#include <future>
#include <regex>
#include <unordered_map>
static const size_t kiB = 1024;
static const size_t MiB = 1024*kiB;
@ -549,6 +550,7 @@ llama_model_loader::llama_model_loader(
llm_kv = LLM_KV(llm_arch_from_string(arch_name));
files.emplace_back(new llama_file(fname.c_str(), "rb", use_direct_io));
file_paths.emplace_back(fname);
contexts.emplace_back(ctx);
if (use_mmap && use_direct_io) {
@ -631,6 +633,7 @@ llama_model_loader::llama_model_loader(
}
files.emplace_back(new llama_file(fname_split, "rb", use_direct_io));
file_paths.emplace_back(fname_split);
contexts.emplace_back(ctx);
// Save tensors data offset info of the shard.
@ -1373,6 +1376,21 @@ bool llama_model_loader::load_all_data(
std::vector<no_init<uint8_t>> read_buf;
std::vector<std::future<std::pair<ggml_tensor *, bool>>> validation_result;
// Thread-local file handles to avoid seek/read races when loading multiple
// contexts in parallel.
thread_local std::unordered_map<std::string, std::unique_ptr<llama_file>> local_files;
auto get_local_file = [this](size_t idx) -> llama_file * {
const std::string key = file_paths.at(idx) + (use_direct_io ? "#dio1" : "#dio0");
auto it = local_files.find(key);
if (it == local_files.end()) {
auto local_file = std::make_unique<llama_file>(file_paths.at(idx).c_str(), "rb", use_direct_io);
auto * ptr = local_file.get();
local_files.emplace(key, std::move(local_file));
return ptr;
}
return it->second.get();
};
// 4 staging buffers for async uploads, each sized 1MB seems to be a good default for single NVMe drives.
// NVMe raid configurations might require more / larger buffers.
constexpr size_t n_buffers = 4;
@ -1479,7 +1497,7 @@ bool llama_model_loader::load_all_data(
}
if (progress_callback) {
if (!progress_callback((float) size_done / size_data, progress_callback_user_data)) {
if (!progress_callback((float) size_done.load(std::memory_order_relaxed) / size_data, progress_callback_user_data)) {
return false;
}
}
@ -1503,19 +1521,21 @@ bool llama_model_loader::load_all_data(
GGML_ASSERT(buf_mmap || cur->data); // either we have a buffer to allocate the tensor in, or it is already allocated
if (buf_mmap && cur->data == nullptr) {
ggml_backend_tensor_alloc(buf_mmap, cur, data);
if (lmlocks) {
const auto & lmlock = lmlocks->at(weight->idx);
lmlock->grow_to(weight->offs + n_size);
{
std::lock_guard<std::mutex> lock(mmaps_used_mutex);
if (lmlocks) {
const auto & lmlock = lmlocks->at(weight->idx);
lmlock->grow_to(weight->offs + n_size);
}
auto & mmap_used = mmaps_used[weight->idx];
mmap_used.first = std::min(mmap_used.first, weight->offs);
mmap_used.second = std::max(mmap_used.second, weight->offs + n_size);
}
auto & mmap_used = mmaps_used[weight->idx];
mmap_used.first = std::min(mmap_used.first, weight->offs);
mmap_used.second = std::max(mmap_used.second, weight->offs + n_size);
} else {
ggml_backend_tensor_set(cur, data, 0, n_size);
}
} else {
const auto & file = files.at(weight->idx);
llama_file * file = get_local_file(weight->idx);
if (ggml_backend_buffer_is_host(cur->buffer)) {
file->seek(weight->offs, SEEK_SET);
@ -1591,7 +1611,7 @@ bool llama_model_loader::load_all_data(
}
}
size_done += n_size;
size_done.fetch_add(n_size, std::memory_order_relaxed);
}
// free temporary resources used for async uploads
@ -1618,7 +1638,12 @@ bool llama_model_loader::load_all_data(
}
// check if this is the last call and do final cleanup
if (size_done >= size_data) {
if (size_done.load(std::memory_order_relaxed) >= size_data) {
bool expected = false;
if (!final_cleanup_done.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
return true;
}
// unmap offloaded tensors and metadata
if (use_mmap) {
for (uint32_t idx = 0; idx < mappings.size(); idx++) {

View File

@ -9,9 +9,11 @@
#include "ggml-cpp.h"
#include <atomic>
#include <cstddef>
#include <cstring>
#include <map>
#include <mutex>
#include <stdexcept>
#include <unordered_map>
@ -81,6 +83,7 @@ struct llama_model_loader {
bool no_alloc;
llama_files files;
std::vector<std::string> file_paths;
llama_ftype ftype;
llama_fver fver;
@ -99,9 +102,11 @@ struct llama_model_loader {
std::string arch_name;
LLM_KV llm_kv = LLM_KV(LLM_ARCH_UNKNOWN);
size_t size_done = 0;
std::atomic<size_t> size_done = 0;
size_t size_data = 0;
std::vector<std::pair<size_t, size_t>> mmaps_used;
std::mutex mmaps_used_mutex;
std::atomic<bool> final_cleanup_done = false;
// define a comparator for the buft -> ctx map to ensure that the order is well-defined:
struct ggml_backend_buft_comparator {

View File

@ -17,12 +17,15 @@
#include "models/models.h"
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <cfloat>
#include <cstdint>
#include <cstring>
#include <cmath>
#include <functional>
#include <future>
#include <map>
#include <regex>
#include <sstream>
@ -7654,10 +7657,60 @@ bool llama_model::load_tensors(llama_model_loader & ml) {
}
// load tensor data
for (auto & [ctx, buf_map] : ctx_buf_maps) {
if (!ml.load_all_data(ctx, buf_map, use_mlock ? &pimpl->mlock_mmaps : NULL, params.progress_callback, params.progress_callback_user_data)) {
const char * limit_env = getenv("LLAMA_ARG_PARALLEL_LOAD");
const size_t default_limit = 4;
const int limit_val = limit_env ? atoi(limit_env) : (int) default_limit;
const size_t n_contexts = ctx_buf_maps.size();
const size_t parallel_limit = limit_val <= 0 ? n_contexts : (size_t) limit_val;
const bool use_parallel = n_contexts > 1 && parallel_limit > 1;
if (use_parallel) {
LLAMA_LOG_INFO("%s: using parallel loading for %zu GPU contexts (limit=%zu)\n", __func__, n_contexts, parallel_limit);
std::atomic<bool> load_failed{false};
std::vector<std::future<bool>> futures;
futures.reserve(n_contexts);
for (auto & [ctx, buf_map] : ctx_buf_maps) {
if (futures.size() >= parallel_limit) {
if (!futures.front().get()) {
load_failed.store(true, std::memory_order_relaxed);
}
futures.erase(futures.begin());
}
auto * ctx_ptr = ctx;
auto * buf_map_ptr = &buf_map;
auto * mlock_ptr = use_mlock ? &pimpl->mlock_mmaps : nullptr;
futures.emplace_back(std::async(std::launch::async, [&ml, ctx_ptr, buf_map_ptr, mlock_ptr, &load_failed]() {
if (load_failed.load(std::memory_order_relaxed)) {
return false;
}
return ml.load_all_data(
ctx_ptr,
*buf_map_ptr,
mlock_ptr,
nullptr,
nullptr);
}));
}
for (auto & future : futures) {
if (!future.get()) {
load_failed.store(true, std::memory_order_relaxed);
}
}
if (load_failed.load(std::memory_order_relaxed)) {
return false;
}
} else {
for (auto & [ctx, buf_map] : ctx_buf_maps) {
if (!ml.load_all_data(ctx, buf_map, use_mlock ? &pimpl->mlock_mmaps : NULL, params.progress_callback, params.progress_callback_user_data)) {
return false;
}
}
}
if (use_mmap_buffer) {