diff --git a/tools/server/README.md b/tools/server/README.md index 554444d74b..f584b60026 100644 --- a/tools/server/README.md +++ b/tools/server/README.md @@ -1634,6 +1634,13 @@ The `status` object can be: } ``` +```json +"status": { + "value": "sleeping", + "args": ["llama-server", "-ctx", "4096"] +} +``` + ### POST `/models/load`: Load a model Load a model diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 9de554e900..b79a5270b5 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -3033,6 +3033,9 @@ struct server_res_generator : server_http_res { } }; +void server_context::on_sleeping_changed(std::function callback) { + impl->queue_tasks.on_sleeping_state(std::move(callback)); +} // diff --git a/tools/server/server-context.h b/tools/server/server-context.h index 75f3d2de56..a4d2201cbe 100644 --- a/tools/server/server-context.h +++ b/tools/server/server-context.h @@ -74,6 +74,10 @@ struct server_context { // get server metadata (read-only), can only be called after load_model() // not thread-safe, should only be used from the main thread server_context_meta get_meta() const; + + // register a callback to be called when sleeping state changes + // must be set before load_model() is called + void on_sleeping_changed(std::function callback); }; diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 62dae2db7a..7e61844f08 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -39,7 +39,8 @@ extern char **environ; #define DEFAULT_STOP_TIMEOUT 10 // seconds #define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit" -#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" +#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" // also sent when waking up from sleep +#define CMD_CHILD_TO_ROUTER_SLEEP "cmd_child_to_router:sleep" // address for child process, this is needed because router may run on 0.0.0.0 // ref: https://github.com/ggml-org/llama.cpp/issues/17862 @@ -380,7 +381,7 @@ void server_models::update_meta(const std::string & name, const server_model_met if (it != mapping.end()) { it->second.meta = meta; } - cv.notify_all(); // notify wait_until_loaded + cv.notify_all(); // notify wait_until_loading_finished } bool server_models::has_model(const std::string & name) { @@ -503,7 +504,7 @@ void server_models::unload_lru() { { std::unique_lock lk(mutex); for (const auto & m : mapping) { - if (m.second.meta.is_active()) { + if (m.second.meta.is_running()) { count_active++; if (m.second.meta.last_used < lru_last_used) { lru_model_name = m.first; @@ -546,7 +547,7 @@ void server_models::load(const std::string & name) { if (base_params.models_max > 0) { size_t count_active = 0; for (const auto & m : mapping) { - if (m.second.meta.is_active()) { + if (m.second.meta.is_running()) { count_active++; } } @@ -605,15 +606,15 @@ void server_models::load(const std::string & name) { std::thread log_thread([&]() { // read stdout/stderr and forward to main server log // also handle status report from child process - bool state_received = false; // true if child state received if (stdout_file) { char buffer[4096]; while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) { LOG("[%5d] %s", port, buffer); - if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) { - // child process is ready + std::string str(buffer); + if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_READY)) { this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0); - state_received = true; + } else if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_SLEEP)) { + this->update_status(name, SERVER_MODEL_STATUS_SLEEPING, 0); } } } else { @@ -706,13 +707,13 @@ void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { - if (it->second.meta.is_active()) { - SRV_INF("unloading model instance name=%s\n", name.c_str()); + if (it->second.meta.is_running()) { + SRV_INF("stopping model instance name=%s\n", name.c_str()); stopping_models.insert(name); cv_stop.notify_all(); // status change will be handled by the managing thread } else { - SRV_WRN("model instance name=%s is not loaded\n", name.c_str()); + SRV_WRN("model instance name=%s is not running\n", name.c_str()); } } } @@ -722,8 +723,8 @@ void server_models::unload_all() { { std::lock_guard lk(mutex); for (auto & [name, inst] : mapping) { - if (inst.meta.is_active()) { - SRV_INF("unloading model instance name=%s\n", name.c_str()); + if (inst.meta.is_running()) { + SRV_INF("stopping model instance name=%s\n", name.c_str()); stopping_models.insert(name); cv_stop.notify_all(); // status change will be handled by the managing thread @@ -750,7 +751,7 @@ void server_models::update_status(const std::string & name, server_model_status cv.notify_all(); } -void server_models::wait_until_loaded(const std::string & name) { +void server_models::wait_until_loading_finished(const std::string & name) { std::unique_lock lk(mutex); cv.wait(lk, [this, &name]() { auto it = mapping.find(name); @@ -761,22 +762,25 @@ void server_models::wait_until_loaded(const std::string & name) { }); } -bool server_models::ensure_model_loaded(const std::string & name) { +bool server_models::ensure_model_ready(const std::string & name) { auto meta = get_meta(name); if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } - if (meta->status == SERVER_MODEL_STATUS_LOADED) { - return false; // already loaded + if (meta->is_ready()) { + return false; // ready for taking requests + } + if (meta->status == SERVER_MODEL_STATUS_SLEEPING) { + return false; // child is sleeping but still running; new request will wake it up } if (meta->status == SERVER_MODEL_STATUS_UNLOADED) { SRV_INF("model name=%s is not loaded, loading...\n", name.c_str()); load(name); } - // for loading state + // wait for loading to complete SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str()); - wait_until_loaded(name); + wait_until_loading_finished(name); // check final status meta = get_meta(name); @@ -792,8 +796,8 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } - if (meta->status != SERVER_MODEL_STATUS_LOADED) { - throw std::invalid_argument("model name=" + name + " is not loaded"); + if (!meta->is_running()) { + throw std::invalid_argument("model name=" + name + " is not running"); } if (update_last_used) { std::unique_lock lk(mutex); @@ -819,6 +823,11 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co return proxy; } +bool server_models::is_child_server() { + const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); + return router_port != nullptr; +} + std::thread server_models::setup_child_server(const std::function & shutdown_handler) { // send a notification to the router server that a model instance is ready common_log_pause(common_log_main()); @@ -852,6 +861,13 @@ std::thread server_models::setup_child_server(const std::function & s }); } +void server_models::notify_router_sleeping_state(bool is_sleeping) { + common_log_pause(common_log_main()); + fflush(stdout); + fprintf(stdout, "%s\n", is_sleeping ? CMD_CHILD_TO_ROUTER_SLEEP : CMD_CHILD_TO_ROUTER_READY); + fflush(stdout); + common_log_resume(common_log_main()); +} // @@ -881,9 +897,9 @@ static bool router_validate_model(std::string & name, server_models & models, bo // resolve alias to canonical model name name = meta->name; if (models_autoload) { - models.ensure_model_loaded(name); + models.ensure_model_ready(name); } else { - if (meta->status != SERVER_MODEL_STATUS_LOADED) { + if (!meta->is_running()) { res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST)); return false; } @@ -956,8 +972,8 @@ void server_models_routes::init_routes() { res_err(res, format_error_response("model is not found", ERROR_TYPE_NOT_FOUND)); return res; } - if (meta->status == SERVER_MODEL_STATUS_LOADED) { - res_err(res, format_error_response("model is already loaded", ERROR_TYPE_INVALID_REQUEST)); + if (meta->is_running()) { + res_err(res, format_error_response("model is already running", ERROR_TYPE_INVALID_REQUEST)); return res; } models.load(meta->name); @@ -1015,8 +1031,8 @@ void server_models_routes::init_routes() { res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST)); return res; } - if (!model->is_active()) { - res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST)); + if (!model->is_running()) { + res_err(res, format_error_response("model is not running", ERROR_TYPE_INVALID_REQUEST)); return res; } models.unload(model->name); diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 2b392f299a..1db34b6c4d 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -14,17 +14,18 @@ /** * state diagram: * - * UNLOADED ──► LOADING ──► LOADED - * ▲ │ │ - * └───failed───┘ │ - * ▲ │ + * UNLOADED ──► LOADING ──► LOADED ◄──── SLEEPING + * ▲ │ │ ▲ + * └───failed───┘ │ │ + * ▲ └──sleeping─────┘ * └────────unloaded─────────┘ */ enum server_model_status { // TODO: also add downloading state when the logic is added SERVER_MODEL_STATUS_UNLOADED, SERVER_MODEL_STATUS_LOADING, - SERVER_MODEL_STATUS_LOADED + SERVER_MODEL_STATUS_LOADED, + SERVER_MODEL_STATUS_SLEEPING }; static server_model_status server_model_status_from_string(const std::string & status_str) { @@ -37,6 +38,9 @@ static server_model_status server_model_status_from_string(const std::string & s if (status_str == "loaded") { return SERVER_MODEL_STATUS_LOADED; } + if (status_str == "sleeping") { + return SERVER_MODEL_STATUS_SLEEPING; + } throw std::runtime_error("invalid server model status"); } @@ -45,6 +49,7 @@ static std::string server_model_status_to_string(server_model_status status) { case SERVER_MODEL_STATUS_UNLOADED: return "unloaded"; case SERVER_MODEL_STATUS_LOADING: return "loading"; case SERVER_MODEL_STATUS_LOADED: return "loaded"; + case SERVER_MODEL_STATUS_SLEEPING: return "sleeping"; default: return "unknown"; } } @@ -61,8 +66,12 @@ struct server_model_meta { int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED) int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown - bool is_active() const { - return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING; + bool is_ready() const { + return status == SERVER_MODEL_STATUS_LOADED; + } + + bool is_running() const { + return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING || status == SERVER_MODEL_STATUS_SLEEPING; } bool is_failed() const { @@ -130,19 +139,26 @@ public: void update_status(const std::string & name, server_model_status status, int exit_code); // wait until the model instance is fully loaded (thread-safe) - // return when the model is loaded or failed to load - void wait_until_loaded(const std::string & name); + // return when the model no longer in "loading" state + void wait_until_loading_finished(const std::string & name); - // load the model if not loaded, otherwise do nothing (thread-safe) - // return false if model is already loaded; return true otherwise (meta may need to be refreshed) - bool ensure_model_loaded(const std::string & name); + // ensure the model is in ready state (thread-safe) + // return false if model is ready + // otherwise, load the model and blocking wait until it's ready, then return true (meta may need to be refreshed) + bool ensure_model_ready(const std::string & name); // proxy an HTTP request to the model instance server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used); + // return true if the current process is a child server instance + static bool is_child_server(); + // notify the router server that a model instance is ready // return the monitoring thread (to be joined by the caller) static std::thread setup_child_server(const std::function & shutdown_handler); + + // notify the router server that the sleeping state has changed + static void notify_router_sleeping_state(bool sleeping); }; struct server_models_routes { diff --git a/tools/server/server-queue.h b/tools/server/server-queue.h index 164f09b195..35f010401f 100644 --- a/tools/server/server-queue.h +++ b/tools/server/server-queue.h @@ -95,11 +95,19 @@ public: callback_update_slots = std::move(callback); } - // Register callback for sleeping state change + // Register callback for sleeping state change; multiple callbacks are allowed // note: when entering sleeping state, the callback is called AFTER sleeping is set to true // when leaving sleeping state, the callback is called BEFORE sleeping is set to false void on_sleeping_state(std::function callback) { - callback_sleeping_state = std::move(callback); + if (callback_sleeping_state) { + auto prev_callback = std::move(callback_sleeping_state); + callback_sleeping_state = [prev_callback, callback](bool sleeping) { + prev_callback(sleeping); + callback(sleeping); + }; + } else { + callback_sleeping_state = std::move(callback); + } } private: diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 0bd6fda17d..73e3aa44e0 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -259,6 +259,12 @@ int main(int argc, char ** argv) { // load the model LOG_INF("%s: loading model\n", __func__); + if (server_models::is_child_server()) { + ctx_server.on_sleeping_changed([&](bool sleeping) { + server_models::notify_router_sleeping_state(sleeping); + }); + } + if (!ctx_server.load_model(params)) { clean_up(); if (ctx_http.thread.joinable()) { @@ -309,9 +315,8 @@ int main(int argc, char ** argv) { LOG_INF("%s: starting the main loop...\n", __func__); // optionally, notify router server that this instance is ready - const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); std::thread monitor_thread; - if (router_port != nullptr) { + if (server_models::is_child_server()) { monitor_thread = server_models::setup_child_server(shutdown_handler); }