server: add auto-sleep after N seconds of idle (#18228)

* implement sleeping at queue level

* implement server-context suspend

* add test

* add docs

* optimization: add fast path

* make sure to free llama_init

* nits

* fix use-after-free

* allow /models to be accessed during sleeping, fix use-after-free

* don't allow accessing /models during sleep, it is not thread-safe

* fix data race on accessing props and model_meta

* small clean up

* trailing whitespace

* rm outdated comments
This commit is contained in:
Xuan-Son Nguyen 2025-12-21 02:24:42 +01:00 committed by GitHub
parent 52ab19df63
commit ddcb75dd8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 355 additions and 122 deletions

View File

@ -2887,6 +2887,16 @@ common_params_context common_params_parser_init(common_params & params, llama_ex
params.lora_init_without_apply = true; params.lora_init_without_apply = true;
} }
).set_examples({LLAMA_EXAMPLE_SERVER})); ).set_examples({LLAMA_EXAMPLE_SERVER}));
add_opt(common_arg(
{"--sleep-idle-seconds"}, "SECONDS",
string_format("number of seconds of idleness after which the server will sleep (default: %d; -1 = disabled)", params.sleep_idle_seconds),
[](common_params & params, int value) {
if (value == 0 || value < -1) {
throw std::invalid_argument("invalid value: cannot be 0 or less than -1");
}
params.sleep_idle_seconds = value;
}
).set_examples({LLAMA_EXAMPLE_SERVER}));
add_opt(common_arg( add_opt(common_arg(
{"--simple-io"}, {"--simple-io"},
"use basic IO for better compatibility in subprocesses and limited consoles", "use basic IO for better compatibility in subprocesses and limited consoles",

View File

@ -475,7 +475,8 @@ struct common_params {
bool enable_chat_template = true; bool enable_chat_template = true;
common_reasoning_format reasoning_format = COMMON_REASONING_FORMAT_DEEPSEEK; common_reasoning_format reasoning_format = COMMON_REASONING_FORMAT_DEEPSEEK;
int reasoning_budget = -1; int reasoning_budget = -1;
bool prefill_assistant = true; // if true, any trailing assistant message will be prefilled into the response bool prefill_assistant = true; // if true, any trailing assistant message will be prefilled into the response
int sleep_idle_seconds = -1; // if >0, server will sleep after this many seconds of idle time
std::vector<std::string> api_keys; std::vector<std::string> api_keys;

View File

@ -209,8 +209,6 @@ int main(int argc, char ** argv) {
return 1; return 1;
} }
ctx_cli.ctx_server.init();
console::spinner::stop(); console::spinner::stop();
console::log("\n"); console::log("\n");

View File

@ -107,6 +107,8 @@ For detailed instructions, see the [test documentation](./tests/README.md).
- Large-scale code base split into smaller files: https://github.com/ggml-org/llama.cpp/pull/17362 - Large-scale code base split into smaller files: https://github.com/ggml-org/llama.cpp/pull/17362
- Introduction of router mode: https://github.com/ggml-org/llama.cpp/pull/17470 - Introduction of router mode: https://github.com/ggml-org/llama.cpp/pull/17470
- Speculative decoding: https://github.com/ggml-org/llama.cpp/pull/17808 and rework in https://github.com/ggml-org/llama.cpp/pull/17808 - Speculative decoding: https://github.com/ggml-org/llama.cpp/pull/17808 and rework in https://github.com/ggml-org/llama.cpp/pull/17808
- INI presets: https://github.com/ggml-org/llama.cpp/pull/17859 (+ refactoring: https://github.com/ggml-org/llama.cpp/pull/18169)
- Sleeping mode: https://github.com/ggml-org/llama.cpp/pull/18228

View File

@ -1621,6 +1621,16 @@ Example of an error:
} }
``` ```
## Sleeping on Idle
The server supports an automatic sleep mode that activates after a specified period of inactivity (no incoming tasks). This feature, introduced in [PR #18228](https://github.com/ggml-org/llama.cpp/pull/18228), can be enabled using the `--sleep-idle-seconds` command-line argument. It works seamlessly in both single-model and multi-model configurations.
When the server enters sleep mode, the model and its associated memory (including the KV cache) are unloaded from RAM to conserve resources. Any new incoming task will automatically trigger the model to reload.
Note that the following endpoints are exempt from being considered as incoming tasks. They do not trigger model reloading and do not reset the idle timer:
- `GET /health`
- `GET /props`
## More examples ## More examples
### Interactive mode ### Interactive mode

View File

@ -544,7 +544,9 @@ struct server_context_impl {
server_metrics metrics; server_metrics metrics;
json webui_settings = json::object(); // cached responses for HTTP API (read-only from HTTP threads)
json json_server_props = json::object();
json json_server_model_meta = json::object();
// Necessary similarity of prompt for slot selection // Necessary similarity of prompt for slot selection
float slot_prompt_similarity = 0.0f; float slot_prompt_similarity = 0.0f;
@ -554,8 +556,23 @@ struct server_context_impl {
common_chat_templates_ptr chat_templates; common_chat_templates_ptr chat_templates;
oaicompat_parser_options oai_parser_opt; oaicompat_parser_options oai_parser_opt;
bool sleeping = false;
~server_context_impl() { ~server_context_impl() {
if (!sleeping) {
// destroy() is already called when entering sleeping state
// we don't call it again here to avoid double free
destroy();
}
}
void destroy() {
llama_init.reset();
ctx = nullptr;
model = nullptr;
mtmd_free(mctx); mtmd_free(mctx);
mctx = nullptr;
// Clear any sampling context // Clear any sampling context
for (server_slot & slot : slots) { for (server_slot & slot : slots) {
@ -571,22 +588,29 @@ struct server_context_impl {
llama_batch_free(batch); llama_batch_free(batch);
} }
void handle_sleeping_state(bool new_state) {
GGML_ASSERT(sleeping != new_state);
if (new_state) {
SRV_INF("%s", "server is entering sleeping state\n");
destroy();
} else {
SRV_INF("%s", "server is exiting sleeping state\n");
if (!load_model(params_base)) {
GGML_ABORT("failed to reload model after sleeping");
}
}
sleeping = new_state;
}
// load the model and initialize llama_context // load the model and initialize llama_context
// this may also be called to resume from sleeping state
bool load_model(const common_params & params) { bool load_model(const common_params & params) {
bool is_resume = sleeping;
SRV_INF("loading model '%s'\n", params.model.path.c_str()); SRV_INF("loading model '%s'\n", params.model.path.c_str());
params_base = params; params_base = params;
webui_settings = json::object();
if (!params_base.webui_config_json.empty()) {
try {
webui_settings = json::parse(params_base.webui_config_json);
} catch (const std::exception & e) {
SRV_ERR("%s: failed to parse webui config: %s\n", __func__, e.what());
return false;
}
}
llama_init = common_init_from_params(params_base); llama_init = common_init_from_params(params_base);
model = llama_init->model(); model = llama_init->model();
@ -654,7 +678,9 @@ struct server_context_impl {
std::string & mmproj_path = params_base.mmproj.path; std::string & mmproj_path = params_base.mmproj.path;
if (!mmproj_path.empty()) { if (!mmproj_path.empty()) {
mtmd_helper_log_set(common_log_default_callback, nullptr); if (!is_resume) {
mtmd_helper_log_set(common_log_default_callback, nullptr);
}
mtmd_context_params mparams = mtmd_context_params_default(); mtmd_context_params mparams = mtmd_context_params_default();
mparams.use_gpu = params_base.mmproj_use_gpu; mparams.use_gpu = params_base.mmproj_use_gpu;
@ -699,19 +725,6 @@ struct server_context_impl {
} }
} }
return true;
}
// initialize slots and server-related data
void init() {
// wiring up server queues
queue_tasks.on_new_task([this](server_task && task) {
process_single_task(std::move(task));
});
queue_tasks.on_update_slots([this]() {
update_slots();
});
// Necessary similarity of prompt for slot selection // Necessary similarity of prompt for slot selection
slot_prompt_similarity = params_base.slot_prompt_similarity; slot_prompt_similarity = params_base.slot_prompt_similarity;
@ -726,6 +739,7 @@ struct server_context_impl {
n_ctx_slot = n_ctx_train; n_ctx_slot = n_ctx_train;
} }
slots.clear();
for (int i = 0; i < params_base.n_parallel; i++) { for (int i = 0; i < params_base.n_parallel; i++) {
server_slot slot; server_slot slot;
@ -742,13 +756,13 @@ struct server_context_impl {
slot.ctx_dft = llama_init_from_model(model_dft, cparams_dft); slot.ctx_dft = llama_init_from_model(model_dft, cparams_dft);
if (slot.ctx_dft == nullptr) { if (slot.ctx_dft == nullptr) {
SRV_ERR("%s", "failed to create draft context\n"); SRV_ERR("%s", "failed to create draft context\n");
return; return false;
} }
slot.spec = common_speculative_init(slot.ctx, slot.ctx_dft); slot.spec = common_speculative_init(slot.ctx, slot.ctx_dft);
if (slot.spec == nullptr) { if (slot.spec == nullptr) {
SRV_ERR("%s", "failed to create speculator\n"); SRV_ERR("%s", "failed to create speculator\n");
return; return false;
} }
for (auto & pair : params_base.speculative.replacements) { for (auto & pair : params_base.speculative.replacements) {
common_speculative_add_replacement_tgt_dft(slot.spec, pair.first.c_str(), pair.second.c_str()); common_speculative_add_replacement_tgt_dft(slot.spec, pair.first.c_str(), pair.second.c_str());
@ -782,8 +796,6 @@ struct server_context_impl {
batch = llama_batch_init(std::max(n_batch, params_base.n_parallel), 0, 1); batch = llama_batch_init(std::max(n_batch, params_base.n_parallel), 0, 1);
} }
metrics.init();
if (params_base.cache_ram_mib != 0) { if (params_base.cache_ram_mib != 0) {
if (params_base.cache_ram_mib < 0) { if (params_base.cache_ram_mib < 0) {
SRV_WRN("prompt cache is enabled, size limit: %s\n", "no limit"); SRV_WRN("prompt cache is enabled, size limit: %s\n", "no limit");
@ -832,6 +844,103 @@ struct server_context_impl {
LOG_INF("%s: chat template, chat_template: %s, example_format: '%s'\n", __func__, LOG_INF("%s: chat template, chat_template: %s, example_format: '%s'\n", __func__,
common_chat_templates_source(chat_templates.get()), common_chat_templates_source(chat_templates.get()),
common_chat_format_example(chat_templates.get(), params_base.use_jinja, params_base.default_template_kwargs).c_str()); common_chat_format_example(chat_templates.get(), params_base.use_jinja, params_base.default_template_kwargs).c_str());
if (!is_resume) {
return init();
}
return true;
}
// unlike load_model(), this is only called once during initialization
bool init() {
GGML_ASSERT(ctx != nullptr);
GGML_ASSERT(model != nullptr);
GGML_ASSERT(!sleeping);
// wiring up server queues
queue_tasks.on_new_task([this](server_task && task) {
process_single_task(std::move(task));
});
queue_tasks.on_update_slots([this]() {
update_slots();
});
queue_tasks.on_sleeping_state([this](bool sleeping) {
handle_sleeping_state(sleeping);
});
metrics.init();
if (!populate_json_responses()) {
SRV_ERR("%s", "failed to populate JSON responses\n");
return false;
}
return true;
}
bool populate_json_responses() {
// populate webui settings
json json_webui_settings = json::object();
{
if (!params_base.webui_config_json.empty()) {
try {
json_webui_settings = json::parse(params_base.webui_config_json);
} catch (const std::exception & e) {
SRV_ERR("%s: failed to parse webui config: %s\n", __func__, e.what());
return false;
}
}
}
// populate server properties
{
task_params params;
params.sampling = params_base.sampling;
json default_generation_settings_for_props = json {
{"params", params.to_json(true)},
{"n_ctx", get_slot_n_ctx()},
};
json_server_props = {
{ "default_generation_settings", default_generation_settings_for_props },
{ "total_slots", params_base.n_parallel },
{ "model_alias", model_name },
{ "model_path", params_base.model.path },
{ "modalities", json {
{"vision", oai_parser_opt.allow_image},
{"audio", oai_parser_opt.allow_audio},
} },
{ "endpoint_slots", params_base.endpoint_slots },
{ "endpoint_props", params_base.endpoint_props },
{ "endpoint_metrics", params_base.endpoint_metrics },
{ "webui", params_base.webui },
{ "webui_settings", json_webui_settings },
{ "chat_template", common_chat_templates_source(chat_templates.get()) },
{ "bos_token", common_token_to_piece(ctx, llama_vocab_bos(vocab), /* special= */ true)},
{ "eos_token", common_token_to_piece(ctx, llama_vocab_eos(vocab), /* special= */ true)},
{ "build_info", build_info },
};
if (params_base.use_jinja) {
if (auto tool_use_src = common_chat_templates_source(chat_templates.get(), "tool_use")) {
json_server_props["chat_template_tool_use"] = tool_use_src;
}
}
}
// populate model metadata
{
json_server_model_meta = {
{"vocab_type", llama_vocab_type (vocab)},
{"n_vocab", llama_vocab_n_tokens (vocab)},
{"n_ctx_train", llama_model_n_ctx_train(model)},
{"n_embd", llama_model_n_embd (model)},
{"n_params", llama_model_n_params (model)},
{"size", llama_model_size (model)},
};
}
return true;
} }
server_slot * get_slot_by_id(int id) { server_slot * get_slot_by_id(int id) {
@ -2635,17 +2744,6 @@ struct server_context_impl {
SRV_DBG("%s", "run slots completed\n"); SRV_DBG("%s", "run slots completed\n");
} }
json model_meta() const {
return json {
{"vocab_type", llama_vocab_type (vocab)},
{"n_vocab", llama_vocab_n_tokens (vocab)},
{"n_ctx_train", llama_model_n_ctx_train(model)},
{"n_embd", llama_model_n_embd (model)},
{"n_params", llama_model_n_params (model)},
{"size", llama_model_size (model)},
};
}
int get_slot_n_ctx() { int get_slot_n_ctx() {
return slots.back().n_ctx; return slots.back().n_ctx;
} }
@ -2662,16 +2760,13 @@ struct server_context_impl {
server_context::server_context() : impl(new server_context_impl()) {} server_context::server_context() : impl(new server_context_impl()) {}
server_context::~server_context() = default; server_context::~server_context() = default;
void server_context::init() {
impl->init();
}
bool server_context::load_model(const common_params & params) { bool server_context::load_model(const common_params & params) {
return impl->load_model(params); return impl->load_model(params);
} }
void server_context::start_loop() { void server_context::start_loop() {
impl->queue_tasks.start_loop(); auto & params = impl->params_base;
impl->queue_tasks.start_loop(params.sleep_idle_seconds * 1000);
} }
void server_context::terminate() { void server_context::terminate() {
@ -2698,10 +2793,17 @@ server_context_info server_context::get_info() const {
// generator-like API for HTTP response generation // generator-like API for HTTP response generation
// may have bypass_sleep = true if the task does not use ctx_server
struct server_res_generator : server_http_res { struct server_res_generator : server_http_res {
server_response_reader rd; server_response_reader rd;
server_res_generator(server_context_impl & ctx_server) server_res_generator(server_context_impl & ctx_server, bool bypass_sleep = false)
: rd(ctx_server.queue_tasks, ctx_server.queue_results, HTTP_POLLING_SECONDS) {} : rd(ctx_server.queue_tasks, ctx_server.queue_results, HTTP_POLLING_SECONDS) {
// fast path in case sleeping is disabled
bypass_sleep |= ctx_server.params_base.sleep_idle_seconds < 0;
if (!bypass_sleep) {
ctx_server.queue_tasks.wait_until_no_sleep();
}
}
void ok(const json & response_data) { void ok(const json & response_data) {
status = 200; status = 200;
data = safe_json_to_str(response_data); data = safe_json_to_str(response_data);
@ -2719,6 +2821,7 @@ struct server_res_generator : server_http_res {
// //
static std::unique_ptr<server_res_generator> handle_completions_impl( static std::unique_ptr<server_res_generator> handle_completions_impl(
std::unique_ptr<server_res_generator> && res_ptr,
server_context_impl & ctx_server, server_context_impl & ctx_server,
server_task_type type, server_task_type type,
const json & data, const json & data,
@ -2727,7 +2830,7 @@ static std::unique_ptr<server_res_generator> handle_completions_impl(
task_response_type res_type) { task_response_type res_type) {
GGML_ASSERT(type == SERVER_TASK_TYPE_COMPLETION || type == SERVER_TASK_TYPE_INFILL); GGML_ASSERT(type == SERVER_TASK_TYPE_COMPLETION || type == SERVER_TASK_TYPE_INFILL);
auto res = std::make_unique<server_res_generator>(ctx_server); auto res = std::move(res_ptr);
auto completion_id = gen_chatcmplid(); auto completion_id = gen_chatcmplid();
auto & rd = res->rd; auto & rd = res->rd;
@ -2931,9 +3034,12 @@ static std::unique_ptr<server_res_generator> handle_completions_impl(
} }
void server_routes::init_routes() { void server_routes::init_routes() {
// IMPORTANT: all lambda functions must start with std::make_unique<server_res_generator>
// this is to ensure that the server_res_generator can handle sleeping case correctly
this->get_health = [this](const server_http_req &) { this->get_health = [this](const server_http_req &) {
// error and loading states are handled by middleware // error and loading states are handled by middleware
auto res = std::make_unique<server_res_generator>(ctx_server); auto res = std::make_unique<server_res_generator>(ctx_server, true);
res->ok({{"status", "ok"}}); res->ok({{"status", "ok"}});
return res; return res;
}; };
@ -3115,46 +3221,10 @@ void server_routes::init_routes() {
}; };
this->get_props = [this](const server_http_req &) { this->get_props = [this](const server_http_req &) {
auto res = std::make_unique<server_res_generator>(ctx_server); auto res = std::make_unique<server_res_generator>(ctx_server, true);
json default_generation_settings_for_props; auto props = ctx_server.json_server_props;
props["is_sleeping"] = ctx_server.queue_tasks.is_sleeping();
{ res->ok(props);
task_params params;
params.sampling = ctx_server.params_base.sampling;
default_generation_settings_for_props = json {
{"params", params.to_json(true)},
{"n_ctx", ctx_server.get_slot_n_ctx()},
};
}
json data = {
{ "default_generation_settings", default_generation_settings_for_props },
{ "total_slots", ctx_server.params_base.n_parallel },
{ "model_alias", ctx_server.model_name },
{ "model_path", ctx_server.params_base.model.path },
{ "modalities", json {
{"vision", ctx_server.oai_parser_opt.allow_image},
{"audio", ctx_server.oai_parser_opt.allow_audio},
} },
{ "endpoint_slots", params.endpoint_slots },
{ "endpoint_props", params.endpoint_props },
{ "endpoint_metrics", params.endpoint_metrics },
{ "webui", params.webui },
{ "webui_settings", ctx_server.webui_settings },
{ "chat_template", common_chat_templates_source(ctx_server.chat_templates.get()) },
{ "bos_token", common_token_to_piece(ctx_server.ctx, llama_vocab_bos(ctx_server.vocab), /* special= */ true)},
{ "eos_token", common_token_to_piece(ctx_server.ctx, llama_vocab_eos(ctx_server.vocab), /* special= */ true)},
{ "build_info", build_info },
};
if (ctx_server.params_base.use_jinja) {
if (auto tool_use_src = common_chat_templates_source(ctx_server.chat_templates.get(), "tool_use")) {
data["chat_template_tool_use"] = tool_use_src;
}
}
res->ok(data);
return res; return res;
}; };
@ -3272,6 +3342,7 @@ void server_routes::init_routes() {
std::vector<raw_buffer> files; // dummy std::vector<raw_buffer> files; // dummy
return handle_completions_impl( return handle_completions_impl(
std::move(res),
ctx_server, ctx_server,
SERVER_TASK_TYPE_INFILL, SERVER_TASK_TYPE_INFILL,
data, data,
@ -3281,9 +3352,11 @@ void server_routes::init_routes() {
}; };
this->post_completions = [this](const server_http_req & req) { this->post_completions = [this](const server_http_req & req) {
auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; // dummy std::vector<raw_buffer> files; // dummy
const json body = json::parse(req.body); const json body = json::parse(req.body);
return handle_completions_impl( return handle_completions_impl(
std::move(res),
ctx_server, ctx_server,
SERVER_TASK_TYPE_COMPLETION, SERVER_TASK_TYPE_COMPLETION,
body, body,
@ -3293,9 +3366,11 @@ void server_routes::init_routes() {
}; };
this->post_completions_oai = [this](const server_http_req & req) { this->post_completions_oai = [this](const server_http_req & req) {
auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; // dummy std::vector<raw_buffer> files; // dummy
const json body = json::parse(req.body); const json body = json::parse(req.body);
return handle_completions_impl( return handle_completions_impl(
std::move(res),
ctx_server, ctx_server,
SERVER_TASK_TYPE_COMPLETION, SERVER_TASK_TYPE_COMPLETION,
body, body,
@ -3305,6 +3380,7 @@ void server_routes::init_routes() {
}; };
this->post_chat_completions = [this](const server_http_req & req) { this->post_chat_completions = [this](const server_http_req & req) {
auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; std::vector<raw_buffer> files;
json body = json::parse(req.body); json body = json::parse(req.body);
json body_parsed = oaicompat_chat_params_parse( json body_parsed = oaicompat_chat_params_parse(
@ -3312,6 +3388,7 @@ void server_routes::init_routes() {
ctx_server.oai_parser_opt, ctx_server.oai_parser_opt,
files); files);
return handle_completions_impl( return handle_completions_impl(
std::move(res),
ctx_server, ctx_server,
SERVER_TASK_TYPE_COMPLETION, SERVER_TASK_TYPE_COMPLETION,
body_parsed, body_parsed,
@ -3321,6 +3398,7 @@ void server_routes::init_routes() {
}; };
this->post_anthropic_messages = [this](const server_http_req & req) { this->post_anthropic_messages = [this](const server_http_req & req) {
auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; std::vector<raw_buffer> files;
json body = convert_anthropic_to_oai(json::parse(req.body)); json body = convert_anthropic_to_oai(json::parse(req.body));
json body_parsed = oaicompat_chat_params_parse( json body_parsed = oaicompat_chat_params_parse(
@ -3328,6 +3406,7 @@ void server_routes::init_routes() {
ctx_server.oai_parser_opt, ctx_server.oai_parser_opt,
files); files);
return handle_completions_impl( return handle_completions_impl(
std::move(res),
ctx_server, ctx_server,
SERVER_TASK_TYPE_COMPLETION, SERVER_TASK_TYPE_COMPLETION,
body_parsed, body_parsed,
@ -3365,11 +3444,13 @@ void server_routes::init_routes() {
return res; return res;
}; };
// TODO: this endpoint is unsafe to access during model reloading (i.e. wake up from sleeping)
// how to make it work even during load_model()?
this->get_models = [this](const server_http_req &) { this->get_models = [this](const server_http_req &) {
auto res = std::make_unique<server_res_generator>(ctx_server); auto res = std::make_unique<server_res_generator>(ctx_server);
json model_meta = nullptr; json model_meta = nullptr;
if (is_ready()) { if (is_ready()) {
model_meta = ctx_server.model_meta(); model_meta = ctx_server.json_server_model_meta;
} }
bool has_mtmd = ctx_server.mctx != nullptr; bool has_mtmd = ctx_server.mctx != nullptr;
json models = { json models = {

View File

@ -22,9 +22,6 @@ struct server_context {
server_context(); server_context();
~server_context(); ~server_context();
// initialize slots and server-related data
void init();
// load the model and initialize llama_context // load the model and initialize llama_context
// returns true on success // returns true on success
bool load_model(const common_params & params); bool load_model(const common_params & params);
@ -35,7 +32,7 @@ struct server_context {
// terminate main loop (will unblock start_loop) // terminate main loop (will unblock start_loop)
void terminate(); void terminate();
// get the underlaying llama_context // get the underlaying llama_context, can return nullptr if sleeping
llama_context * get_llama_context() const; llama_context * get_llama_context() const;
// get a new response reader, used by CLI application // get a new response reader, used by CLI application

View File

@ -33,6 +33,7 @@ int server_queue::post(server_task && task, bool front) {
} else { } else {
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
} }
time_last_task = ggml_time_ms();
condition_tasks.notify_one(); condition_tasks.notify_one();
return task_id; return task_id;
} }
@ -54,6 +55,7 @@ int server_queue::post(std::vector<server_task> && tasks, bool front) {
queue_tasks.push_back(std::move(task)); queue_tasks.push_back(std::move(task));
} }
} }
time_last_task = ggml_time_ms();
condition_tasks.notify_one(); condition_tasks.notify_one();
return 0; return 0;
} }
@ -62,6 +64,7 @@ void server_queue::defer(server_task && task) {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
QUE_DBG("defer task, id = %d\n", task.id); QUE_DBG("defer task, id = %d\n", task.id);
queue_tasks_deferred.push_back(std::move(task)); queue_tasks_deferred.push_back(std::move(task));
time_last_task = ggml_time_ms();
condition_tasks.notify_one(); condition_tasks.notify_one();
} }
@ -71,31 +74,52 @@ int server_queue::get_new_id() {
return new_id; return new_id;
} }
void server_queue::on_new_task(std::function<void(server_task &&)> callback) {
callback_new_task = std::move(callback);
}
void server_queue::on_update_slots(std::function<void(void)> callback) {
callback_update_slots = std::move(callback);
}
void server_queue::pop_deferred_task() { void server_queue::pop_deferred_task() {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
if (!queue_tasks_deferred.empty()) { if (!queue_tasks_deferred.empty()) {
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front())); queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
queue_tasks_deferred.pop_front(); queue_tasks_deferred.pop_front();
} }
time_last_task = ggml_time_ms();
condition_tasks.notify_one(); condition_tasks.notify_one();
} }
void server_queue::wait_until_no_sleep() {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!sleeping) {
return;
} else {
if (!req_stop_sleeping) {
QUE_DBG("%s", "requesting to stop sleeping\n");
req_stop_sleeping = true;
condition_tasks.notify_one(); // only main thread is waiting on this
}
QUE_DBG("%s", "waiting until no sleep\n");
condition_tasks.wait(lock, [&]{
return !sleeping;
});
}
}
void server_queue::terminate() { void server_queue::terminate() {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
running = false; running = false;
condition_tasks.notify_all(); condition_tasks.notify_all();
} }
void server_queue::start_loop() { void server_queue::start_loop(int64_t idle_sleep_ms) {
running = true; running = true;
time_last_task = ggml_time_ms();
constexpr auto max_wait_time = std::chrono::seconds(1);
auto should_sleep = [&]() -> bool {
// caller must hold mutex_tasks
if (idle_sleep_ms < 0) {
return false;
}
int64_t now = ggml_time_ms();
return (now - time_last_task) >= idle_sleep_ms;
};
while (true) { while (true) {
QUE_DBG("%s", "processing new tasks\n"); QUE_DBG("%s", "processing new tasks\n");
@ -117,23 +141,53 @@ void server_queue::start_loop() {
QUE_DBG("processing task, id = %d\n", task.id); QUE_DBG("processing task, id = %d\n", task.id);
callback_new_task(std::move(task)); callback_new_task(std::move(task));
} }
// all tasks in the current loop is processed, slots data is now ready // all tasks in the current loop is processed, slots data is now ready
QUE_DBG("%s", "update slots\n"); QUE_DBG("%s", "update slots\n");
// this will run the main inference process for all slots
callback_update_slots(); callback_update_slots();
{
// update_slots() may take a while to finish, we need to make sure it's not counted as idle
std::unique_lock<std::mutex> lock(mutex_tasks);
time_last_task = ggml_time_ms();
}
QUE_DBG("%s", "waiting for new tasks\n"); QUE_DBG("%s", "waiting for new tasks\n");
{ while (true) {
std::unique_lock<std::mutex> lock(mutex_tasks); std::unique_lock<std::mutex> lock(mutex_tasks);
if (!running) { if (!running || !queue_tasks.empty()) {
QUE_DBG("%s", "terminate\n"); break; // go back to process new tasks or terminate
return;
} }
if (queue_tasks.empty()) {
// no tasks, check for sleeping state
if (should_sleep()) {
QUE_INF("%s", "entering sleeping state\n");
sleeping = true;
callback_sleeping_state(true);
req_stop_sleeping = false;
// wait until we are requested to exit sleeping state
condition_tasks.wait(lock, [&]{ condition_tasks.wait(lock, [&]{
return (!running || req_stop_sleeping);
});
if (!running) { // may changed during sleep
break; // terminate
}
QUE_INF("%s", "exiting sleeping state\n");
req_stop_sleeping = false;
callback_sleeping_state(false);
sleeping = false;
time_last_task = ggml_time_ms();
condition_tasks.notify_all(); // notify wait_until_no_sleep()
break; // process new tasks
} else {
// wait for new tasks or timeout for checking sleeping condition
bool res = condition_tasks.wait_for(lock, max_wait_time, [&]{
return (!queue_tasks.empty() || !running); return (!queue_tasks.empty() || !running);
}); });
if (res) {
break; // new task arrived or terminate
}
// otherwise, loop again to check sleeping condition
} }
} }
} }

View File

@ -12,7 +12,10 @@
struct server_queue { struct server_queue {
private: private:
int id = 0; int id = 0;
bool running; bool running = false;
bool sleeping = false;
bool req_stop_sleeping = false;
int64_t time_last_task = 0;
// queues // queues
std::deque<server_task> queue_tasks; std::deque<server_task> queue_tasks;
@ -24,6 +27,7 @@ private:
// callback functions // callback functions
std::function<void(server_task &&)> callback_new_task; std::function<void(server_task &&)> callback_new_task;
std::function<void(void)> callback_update_slots; std::function<void(void)> callback_update_slots;
std::function<void(bool)> callback_sleeping_state;
public: public:
// Add a new task to the end of the queue // Add a new task to the end of the queue
@ -38,15 +42,18 @@ public:
// Get the next id for creating a new task // Get the next id for creating a new task
int get_new_id(); int get_new_id();
// Register function to process a new task
void on_new_task(std::function<void(server_task &&)> callback);
// Register the function to be called when all slots data is ready to be processed
void on_update_slots(std::function<void(void)> callback);
// Call when the state of one slot is changed, it will move one task from deferred to main queue // Call when the state of one slot is changed, it will move one task from deferred to main queue
void pop_deferred_task(); void pop_deferred_task();
// if sleeping, request exiting sleep state and wait until it is done
// returns immediately if not sleeping
void wait_until_no_sleep();
bool is_sleeping() {
std::unique_lock<std::mutex> lock(mutex_tasks);
return sleeping;
}
// end the start_loop routine // end the start_loop routine
void terminate(); void terminate();
@ -56,8 +63,15 @@ public:
* - Process the task (i.e. maybe copy data into slot) * - Process the task (i.e. maybe copy data into slot)
* - Check if multitask is finished * - Check if multitask is finished
* - Update all slots * - Update all slots
*
* Sleeping procedure (disabled if idle_sleep_ms < 0):
* - If there is no task after idle_sleep_ms, enter sleeping state
* - Call callback_sleeping_state(true)
* - Wait until req_stop_sleeping is set to true
* - Call callback_sleeping_state(false)
* - Exit sleeping state
*/ */
void start_loop(); void start_loop(int64_t idle_sleep_ms = -1);
// for metrics // for metrics
size_t queue_tasks_deferred_size() { size_t queue_tasks_deferred_size() {
@ -65,6 +79,27 @@ public:
return queue_tasks_deferred.size(); return queue_tasks_deferred.size();
} }
//
// Functions below are not thread-safe, must only be used before start_loop() is called
//
// Register function to process a new task
void on_new_task(std::function<void(server_task &&)> callback) {
callback_new_task = std::move(callback);
}
// Register the function to be called when all slots data is ready to be processed
void on_update_slots(std::function<void(void)> callback) {
callback_update_slots = std::move(callback);
}
// Register callback for sleeping state change
// note: when entering sleeping state, the callback is called AFTER sleeping is set to true
// when leaving sleeping state, the callback is called BEFORE sleeping is set to false
void on_sleeping_state(std::function<void(bool)> callback) {
callback_sleeping_state = std::move(callback);
}
private: private:
void cleanup_pending_task(int id_target); void cleanup_pending_task(int id_target);
}; };

View File

@ -252,7 +252,6 @@ int main(int argc, char ** argv, char ** envp) {
return 1; return 1;
} }
ctx_server.init();
ctx_http.is_ready.store(true); ctx_http.is_ready.store(true);
LOG_INF("%s: model loaded\n", __func__); LOG_INF("%s: model loaded\n", __func__);
@ -309,7 +308,11 @@ int main(int argc, char ** argv, char ** envp) {
if (monitor_thread.joinable()) { if (monitor_thread.joinable()) {
monitor_thread.join(); monitor_thread.join();
} }
llama_memory_breakdown_print(ctx_server.get_llama_context());
auto * ll_ctx = ctx_server.get_llama_context();
if (ll_ctx != nullptr) {
llama_memory_breakdown_print(ll_ctx);
}
} }
return 0; return 0;

View File

@ -0,0 +1,39 @@
import pytest
import time
from utils import *
server = ServerPreset.tinyllama2()
@pytest.fixture(autouse=True)
def create_server():
global server
server = ServerPreset.tinyllama2()
def test_server_sleep():
global server
server.sleep_idle_seconds = 1
server.start()
# wait a bit so that server can go to sleep
time.sleep(2)
# make sure these endpoints are still responsive after sleep
res = server.make_request("GET", "/health")
assert res.status_code == 200
res = server.make_request("GET", "/props")
assert res.status_code == 200
assert res.body["is_sleeping"] == True
# make a generation request to wake up the server
res = server.make_request("POST", "/completion", data={
"n_predict": 1,
"prompt": "Hello",
})
assert res.status_code == 200
# it should no longer be sleeping
res = server.make_request("GET", "/props")
assert res.status_code == 200
assert res.body["is_sleeping"] == False

View File

@ -100,6 +100,7 @@ class ServerProcess:
server_path: str | None = None server_path: str | None = None
mmproj_url: str | None = None mmproj_url: str | None = None
media_path: str | None = None media_path: str | None = None
sleep_idle_seconds: int | None = None
# session variables # session variables
process: subprocess.Popen | None = None process: subprocess.Popen | None = None
@ -230,6 +231,8 @@ class ServerProcess:
server_args.extend(["--mmproj-url", self.mmproj_url]) server_args.extend(["--mmproj-url", self.mmproj_url])
if self.media_path: if self.media_path:
server_args.extend(["--media-path", self.media_path]) server_args.extend(["--media-path", self.media_path])
if self.sleep_idle_seconds is not None:
server_args.extend(["--sleep-idle-seconds", self.sleep_idle_seconds])
args = [str(arg) for arg in [server_path, *server_args]] args = [str(arg) for arg in [server_path, *server_args]]
print(f"tests: starting server with: {' '.join(args)}") print(f"tests: starting server with: {' '.join(args)}")