server: allow router to report child instances sleep status (#20849)
* server: allow router to report child instances sleep status * refactor * move sleeping to state * nits
This commit is contained in:
parent
bd3f1d9d65
commit
49bfddeca1
|
|
@ -1634,6 +1634,13 @@ The `status` object can be:
|
|||
}
|
||||
```
|
||||
|
||||
```json
|
||||
"status": {
|
||||
"value": "sleeping",
|
||||
"args": ["llama-server", "-ctx", "4096"]
|
||||
}
|
||||
```
|
||||
|
||||
### POST `/models/load`: Load a model
|
||||
|
||||
Load a model
|
||||
|
|
|
|||
|
|
@ -3033,6 +3033,9 @@ struct server_res_generator : server_http_res {
|
|||
}
|
||||
};
|
||||
|
||||
void server_context::on_sleeping_changed(std::function<void(bool)> callback) {
|
||||
impl->queue_tasks.on_sleeping_state(std::move(callback));
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
|
|
|
|||
|
|
@ -74,6 +74,10 @@ struct server_context {
|
|||
// get server metadata (read-only), can only be called after load_model()
|
||||
// not thread-safe, should only be used from the main thread
|
||||
server_context_meta get_meta() const;
|
||||
|
||||
// register a callback to be called when sleeping state changes
|
||||
// must be set before load_model() is called
|
||||
void on_sleeping_changed(std::function<void(bool)> callback);
|
||||
};
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,8 @@ extern char **environ;
|
|||
#define DEFAULT_STOP_TIMEOUT 10 // seconds
|
||||
|
||||
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
|
||||
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
|
||||
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" // also sent when waking up from sleep
|
||||
#define CMD_CHILD_TO_ROUTER_SLEEP "cmd_child_to_router:sleep"
|
||||
|
||||
// address for child process, this is needed because router may run on 0.0.0.0
|
||||
// ref: https://github.com/ggml-org/llama.cpp/issues/17862
|
||||
|
|
@ -380,7 +381,7 @@ void server_models::update_meta(const std::string & name, const server_model_met
|
|||
if (it != mapping.end()) {
|
||||
it->second.meta = meta;
|
||||
}
|
||||
cv.notify_all(); // notify wait_until_loaded
|
||||
cv.notify_all(); // notify wait_until_loading_finished
|
||||
}
|
||||
|
||||
bool server_models::has_model(const std::string & name) {
|
||||
|
|
@ -503,7 +504,7 @@ void server_models::unload_lru() {
|
|||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
for (const auto & m : mapping) {
|
||||
if (m.second.meta.is_active()) {
|
||||
if (m.second.meta.is_running()) {
|
||||
count_active++;
|
||||
if (m.second.meta.last_used < lru_last_used) {
|
||||
lru_model_name = m.first;
|
||||
|
|
@ -546,7 +547,7 @@ void server_models::load(const std::string & name) {
|
|||
if (base_params.models_max > 0) {
|
||||
size_t count_active = 0;
|
||||
for (const auto & m : mapping) {
|
||||
if (m.second.meta.is_active()) {
|
||||
if (m.second.meta.is_running()) {
|
||||
count_active++;
|
||||
}
|
||||
}
|
||||
|
|
@ -605,15 +606,15 @@ void server_models::load(const std::string & name) {
|
|||
std::thread log_thread([&]() {
|
||||
// read stdout/stderr and forward to main server log
|
||||
// also handle status report from child process
|
||||
bool state_received = false; // true if child state received
|
||||
if (stdout_file) {
|
||||
char buffer[4096];
|
||||
while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
|
||||
LOG("[%5d] %s", port, buffer);
|
||||
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
|
||||
// child process is ready
|
||||
std::string str(buffer);
|
||||
if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_READY)) {
|
||||
this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
|
||||
state_received = true;
|
||||
} else if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_SLEEP)) {
|
||||
this->update_status(name, SERVER_MODEL_STATUS_SLEEPING, 0);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -706,13 +707,13 @@ void server_models::unload(const std::string & name) {
|
|||
std::lock_guard<std::mutex> lk(mutex);
|
||||
auto it = mapping.find(name);
|
||||
if (it != mapping.end()) {
|
||||
if (it->second.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
if (it->second.meta.is_running()) {
|
||||
SRV_INF("stopping model instance name=%s\n", name.c_str());
|
||||
stopping_models.insert(name);
|
||||
cv_stop.notify_all();
|
||||
// status change will be handled by the managing thread
|
||||
} else {
|
||||
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
|
||||
SRV_WRN("model instance name=%s is not running\n", name.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -722,8 +723,8 @@ void server_models::unload_all() {
|
|||
{
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
for (auto & [name, inst] : mapping) {
|
||||
if (inst.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
if (inst.meta.is_running()) {
|
||||
SRV_INF("stopping model instance name=%s\n", name.c_str());
|
||||
stopping_models.insert(name);
|
||||
cv_stop.notify_all();
|
||||
// status change will be handled by the managing thread
|
||||
|
|
@ -750,7 +751,7 @@ void server_models::update_status(const std::string & name, server_model_status
|
|||
cv.notify_all();
|
||||
}
|
||||
|
||||
void server_models::wait_until_loaded(const std::string & name) {
|
||||
void server_models::wait_until_loading_finished(const std::string & name) {
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
cv.wait(lk, [this, &name]() {
|
||||
auto it = mapping.find(name);
|
||||
|
|
@ -761,22 +762,25 @@ void server_models::wait_until_loaded(const std::string & name) {
|
|||
});
|
||||
}
|
||||
|
||||
bool server_models::ensure_model_loaded(const std::string & name) {
|
||||
bool server_models::ensure_model_ready(const std::string & name) {
|
||||
auto meta = get_meta(name);
|
||||
if (!meta.has_value()) {
|
||||
throw std::runtime_error("model name=" + name + " is not found");
|
||||
}
|
||||
if (meta->status == SERVER_MODEL_STATUS_LOADED) {
|
||||
return false; // already loaded
|
||||
if (meta->is_ready()) {
|
||||
return false; // ready for taking requests
|
||||
}
|
||||
if (meta->status == SERVER_MODEL_STATUS_SLEEPING) {
|
||||
return false; // child is sleeping but still running; new request will wake it up
|
||||
}
|
||||
if (meta->status == SERVER_MODEL_STATUS_UNLOADED) {
|
||||
SRV_INF("model name=%s is not loaded, loading...\n", name.c_str());
|
||||
load(name);
|
||||
}
|
||||
|
||||
// for loading state
|
||||
// wait for loading to complete
|
||||
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
|
||||
wait_until_loaded(name);
|
||||
wait_until_loading_finished(name);
|
||||
|
||||
// check final status
|
||||
meta = get_meta(name);
|
||||
|
|
@ -792,8 +796,8 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
|
|||
if (!meta.has_value()) {
|
||||
throw std::runtime_error("model name=" + name + " is not found");
|
||||
}
|
||||
if (meta->status != SERVER_MODEL_STATUS_LOADED) {
|
||||
throw std::invalid_argument("model name=" + name + " is not loaded");
|
||||
if (!meta->is_running()) {
|
||||
throw std::invalid_argument("model name=" + name + " is not running");
|
||||
}
|
||||
if (update_last_used) {
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
|
|
@ -819,6 +823,11 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
|
|||
return proxy;
|
||||
}
|
||||
|
||||
bool server_models::is_child_server() {
|
||||
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
||||
return router_port != nullptr;
|
||||
}
|
||||
|
||||
std::thread server_models::setup_child_server(const std::function<void(int)> & shutdown_handler) {
|
||||
// send a notification to the router server that a model instance is ready
|
||||
common_log_pause(common_log_main());
|
||||
|
|
@ -852,6 +861,13 @@ std::thread server_models::setup_child_server(const std::function<void(int)> & s
|
|||
});
|
||||
}
|
||||
|
||||
void server_models::notify_router_sleeping_state(bool is_sleeping) {
|
||||
common_log_pause(common_log_main());
|
||||
fflush(stdout);
|
||||
fprintf(stdout, "%s\n", is_sleeping ? CMD_CHILD_TO_ROUTER_SLEEP : CMD_CHILD_TO_ROUTER_READY);
|
||||
fflush(stdout);
|
||||
common_log_resume(common_log_main());
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
|
|
@ -881,9 +897,9 @@ static bool router_validate_model(std::string & name, server_models & models, bo
|
|||
// resolve alias to canonical model name
|
||||
name = meta->name;
|
||||
if (models_autoload) {
|
||||
models.ensure_model_loaded(name);
|
||||
models.ensure_model_ready(name);
|
||||
} else {
|
||||
if (meta->status != SERVER_MODEL_STATUS_LOADED) {
|
||||
if (!meta->is_running()) {
|
||||
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
|
||||
return false;
|
||||
}
|
||||
|
|
@ -956,8 +972,8 @@ void server_models_routes::init_routes() {
|
|||
res_err(res, format_error_response("model is not found", ERROR_TYPE_NOT_FOUND));
|
||||
return res;
|
||||
}
|
||||
if (meta->status == SERVER_MODEL_STATUS_LOADED) {
|
||||
res_err(res, format_error_response("model is already loaded", ERROR_TYPE_INVALID_REQUEST));
|
||||
if (meta->is_running()) {
|
||||
res_err(res, format_error_response("model is already running", ERROR_TYPE_INVALID_REQUEST));
|
||||
return res;
|
||||
}
|
||||
models.load(meta->name);
|
||||
|
|
@ -1015,8 +1031,8 @@ void server_models_routes::init_routes() {
|
|||
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
|
||||
return res;
|
||||
}
|
||||
if (!model->is_active()) {
|
||||
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
|
||||
if (!model->is_running()) {
|
||||
res_err(res, format_error_response("model is not running", ERROR_TYPE_INVALID_REQUEST));
|
||||
return res;
|
||||
}
|
||||
models.unload(model->name);
|
||||
|
|
|
|||
|
|
@ -14,17 +14,18 @@
|
|||
/**
|
||||
* state diagram:
|
||||
*
|
||||
* UNLOADED ──► LOADING ──► LOADED
|
||||
* ▲ │ │
|
||||
* └───failed───┘ │
|
||||
* ▲ │
|
||||
* UNLOADED ──► LOADING ──► LOADED ◄──── SLEEPING
|
||||
* ▲ │ │ ▲
|
||||
* └───failed───┘ │ │
|
||||
* ▲ └──sleeping─────┘
|
||||
* └────────unloaded─────────┘
|
||||
*/
|
||||
enum server_model_status {
|
||||
// TODO: also add downloading state when the logic is added
|
||||
SERVER_MODEL_STATUS_UNLOADED,
|
||||
SERVER_MODEL_STATUS_LOADING,
|
||||
SERVER_MODEL_STATUS_LOADED
|
||||
SERVER_MODEL_STATUS_LOADED,
|
||||
SERVER_MODEL_STATUS_SLEEPING
|
||||
};
|
||||
|
||||
static server_model_status server_model_status_from_string(const std::string & status_str) {
|
||||
|
|
@ -37,6 +38,9 @@ static server_model_status server_model_status_from_string(const std::string & s
|
|||
if (status_str == "loaded") {
|
||||
return SERVER_MODEL_STATUS_LOADED;
|
||||
}
|
||||
if (status_str == "sleeping") {
|
||||
return SERVER_MODEL_STATUS_SLEEPING;
|
||||
}
|
||||
throw std::runtime_error("invalid server model status");
|
||||
}
|
||||
|
||||
|
|
@ -45,6 +49,7 @@ static std::string server_model_status_to_string(server_model_status status) {
|
|||
case SERVER_MODEL_STATUS_UNLOADED: return "unloaded";
|
||||
case SERVER_MODEL_STATUS_LOADING: return "loading";
|
||||
case SERVER_MODEL_STATUS_LOADED: return "loaded";
|
||||
case SERVER_MODEL_STATUS_SLEEPING: return "sleeping";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
|
|
@ -61,8 +66,12 @@ struct server_model_meta {
|
|||
int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
|
||||
int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
|
||||
|
||||
bool is_active() const {
|
||||
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
|
||||
bool is_ready() const {
|
||||
return status == SERVER_MODEL_STATUS_LOADED;
|
||||
}
|
||||
|
||||
bool is_running() const {
|
||||
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING || status == SERVER_MODEL_STATUS_SLEEPING;
|
||||
}
|
||||
|
||||
bool is_failed() const {
|
||||
|
|
@ -130,19 +139,26 @@ public:
|
|||
void update_status(const std::string & name, server_model_status status, int exit_code);
|
||||
|
||||
// wait until the model instance is fully loaded (thread-safe)
|
||||
// return when the model is loaded or failed to load
|
||||
void wait_until_loaded(const std::string & name);
|
||||
// return when the model no longer in "loading" state
|
||||
void wait_until_loading_finished(const std::string & name);
|
||||
|
||||
// load the model if not loaded, otherwise do nothing (thread-safe)
|
||||
// return false if model is already loaded; return true otherwise (meta may need to be refreshed)
|
||||
bool ensure_model_loaded(const std::string & name);
|
||||
// ensure the model is in ready state (thread-safe)
|
||||
// return false if model is ready
|
||||
// otherwise, load the model and blocking wait until it's ready, then return true (meta may need to be refreshed)
|
||||
bool ensure_model_ready(const std::string & name);
|
||||
|
||||
// proxy an HTTP request to the model instance
|
||||
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used);
|
||||
|
||||
// return true if the current process is a child server instance
|
||||
static bool is_child_server();
|
||||
|
||||
// notify the router server that a model instance is ready
|
||||
// return the monitoring thread (to be joined by the caller)
|
||||
static std::thread setup_child_server(const std::function<void(int)> & shutdown_handler);
|
||||
|
||||
// notify the router server that the sleeping state has changed
|
||||
static void notify_router_sleeping_state(bool sleeping);
|
||||
};
|
||||
|
||||
struct server_models_routes {
|
||||
|
|
|
|||
|
|
@ -95,11 +95,19 @@ public:
|
|||
callback_update_slots = std::move(callback);
|
||||
}
|
||||
|
||||
// Register callback for sleeping state change
|
||||
// Register callback for sleeping state change; multiple callbacks are allowed
|
||||
// note: when entering sleeping state, the callback is called AFTER sleeping is set to true
|
||||
// when leaving sleeping state, the callback is called BEFORE sleeping is set to false
|
||||
void on_sleeping_state(std::function<void(bool)> callback) {
|
||||
callback_sleeping_state = std::move(callback);
|
||||
if (callback_sleeping_state) {
|
||||
auto prev_callback = std::move(callback_sleeping_state);
|
||||
callback_sleeping_state = [prev_callback, callback](bool sleeping) {
|
||||
prev_callback(sleeping);
|
||||
callback(sleeping);
|
||||
};
|
||||
} else {
|
||||
callback_sleeping_state = std::move(callback);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
|||
|
|
@ -259,6 +259,12 @@ int main(int argc, char ** argv) {
|
|||
// load the model
|
||||
LOG_INF("%s: loading model\n", __func__);
|
||||
|
||||
if (server_models::is_child_server()) {
|
||||
ctx_server.on_sleeping_changed([&](bool sleeping) {
|
||||
server_models::notify_router_sleeping_state(sleeping);
|
||||
});
|
||||
}
|
||||
|
||||
if (!ctx_server.load_model(params)) {
|
||||
clean_up();
|
||||
if (ctx_http.thread.joinable()) {
|
||||
|
|
@ -309,9 +315,8 @@ int main(int argc, char ** argv) {
|
|||
LOG_INF("%s: starting the main loop...\n", __func__);
|
||||
|
||||
// optionally, notify router server that this instance is ready
|
||||
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
||||
std::thread monitor_thread;
|
||||
if (router_port != nullptr) {
|
||||
if (server_models::is_child_server()) {
|
||||
monitor_thread = server_models::setup_child_server(shutdown_handler);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue