From b9ebdf616a19851e972885699d27a1ced06d072e Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 20 Nov 2025 15:49:40 +0100 Subject: [PATCH 1/4] more stable --- tools/server/server-models.cpp | 60 +++++++++++++++++++--------------- tools/server/server-models.h | 3 +- tools/server/server.cpp | 5 ++- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 74baed9c60..776bd4c26d 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -264,7 +264,19 @@ void server_models::load(const std::string & name) { inst.subproc = std::move(child_proc); // start a thread to manage the child process - inst.th = std::thread([this, name, child_proc = &inst.subproc]() { + inst.th = std::thread([this, name, child_proc = &inst.subproc, port = inst.meta.port]() { + // read stdout/stderr and forward to main server log + { + FILE * p_stdout_stderr = subprocess_stdout(child_proc); + if (!p_stdout_stderr) { + return; + } + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { + LOG("[%5d] %s", port, buffer); + } + } + // we reach here when the child process exits int exit_code = 0; subprocess_join(child_proc, &exit_code); // update PID and status @@ -280,23 +292,10 @@ void server_models::load(const std::string & name) { } SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code); }); - if (inst.th.joinable()) { - inst.th.detach(); // TODO: remove this because it makes joining impossible - } - // start a logging thread to read stdout/stderr - inst.th_log = std::thread([child_proc = &inst.subproc, port = inst.meta.port]() { - FILE * p_stdout_stderr = subprocess_stdout(child_proc); - if (!p_stdout_stderr) { - return; - } - char buffer[4096]; - while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { - LOG("[%5d] %s", port, buffer); - } - }); - if (inst.th_log.joinable()) { - inst.th_log.detach(); + // clean up old thread if exists + if (mapping[name].th.joinable()) { + mapping[name].th.join(); } mapping[name] = std::move(inst); @@ -387,15 +386,9 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co return proxy; } -void server_models::notify_router_server_ready(const std::string & host, const std::string & name) { +void server_models::setup_child_server(const std::string & host, int router_port, const std::string & name, std::function & shutdown_handler) { // send a notification to the router server that a model instance is ready - const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); - if (router_port == nullptr) { - // no router server to notify, this is a standalone server - return; - } - - httplib::Client cli(host, std::atoi(router_port)); + httplib::Client cli(host, router_port); cli.set_connection_timeout(0, 200000); // 200 milliseconds httplib::Request req; @@ -408,13 +401,28 @@ void server_models::notify_router_server_ready(const std::string & host, const s body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED); req.body = body.dump(); - SRV_INF("notifying router server (port=%s) that model %s is ready\n", router_port, name.c_str()); + SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str()); auto result = cli.send(std::move(req)); if (result.error() != httplib::Error::Success) { auto err_str = httplib::to_string(result.error()); SRV_ERR("failed to notify router server: %s\n", err_str.c_str()); // TODO: maybe force shutdown here? } + + // setup thread for monitoring stdin + // when EOF is detected, that means the router server requested shutdown, or the parent process died + std::thread([shutdown_handler]() { + // wait for EOF on stdin + SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n"); + while (true) { + int c = getchar(); + if (c == EOF) { + break; + } + } + SRV_INF("%s", "EOF on stdin detected, invoking shutdown handler...\n"); + shutdown_handler(0); // invoke shutdown handler + }).detach(); } diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 3e159c18e3..00c4e88da8 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -8,6 +8,7 @@ #include #include #include +#include enum server_model_status { SERVER_MODEL_STATUS_UNLOADED, @@ -98,7 +99,7 @@ public: server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name); // notify the router server that a model instance is ready - static void notify_router_server_ready(const std::string & host, const std::string & name); + static void setup_child_server(const std::string & host, int router_port, const std::string & name, std::function & shutdown_handler); }; /** diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 9aa0cb9f50..2c8bb595b1 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5825,7 +5825,10 @@ int main(int argc, char ** argv, char ** envp) { LOG_INF("%s: starting the main loop...\n", __func__); // optionally, notify router server that this instance is ready - server_models::notify_router_server_ready(params.hostname, params.model_alias); + const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); + if (router_port != nullptr) { + server_models::setup_child_server(params.hostname, std::atoi(router_port), params.model_alias, shutdown_handler); + } // this call blocks the main thread until queue_tasks.terminate() is called ctx_server.queue_tasks.start_loop(); From 6610724f8e4543a217389f27abf8feacc48ad02a Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 20 Nov 2025 16:13:30 +0100 Subject: [PATCH 2/4] fix unsafe pointer --- tools/server/server-models.cpp | 25 ++++++++++++------------- tools/server/server-models.h | 4 ++-- tools/server/server.cpp | 4 +++- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 776bd4c26d..9c03d5ecb7 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -91,9 +91,8 @@ server_models::server_models( /* status */ SERVER_MODEL_STATUS_UNLOADED }; mapping[meta.name] = instance_t{ - /* subproc */ subprocess_s(), + /* subproc */ std::make_shared(), /* th */ std::thread(), - /* th_log */ std::thread(), /* meta */ meta }; } @@ -220,7 +219,7 @@ void server_models::load(const std::string & name) { inst.meta.port = get_free_port(); inst.meta.status = SERVER_MODEL_STATUS_LOADING; - subprocess_s child_proc; + inst.subproc = std::make_shared(); { std::string exec_path = get_server_exec_path().string(); SRV_INF("spawning server instance with name=%s on port %d\n", inst.meta.name.c_str(), inst.meta.port); @@ -255,19 +254,17 @@ void server_models::load(const std::string & name) { std::vector envp = to_char_ptr_array(child_env); int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr; - int result = subprocess_create_ex(argv.data(), options, envp.data(), &child_proc); + int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get()); if (result != 0) { throw std::runtime_error("failed to spawn server instance"); } } - inst.subproc = std::move(child_proc); - // start a thread to manage the child process - inst.th = std::thread([this, name, child_proc = &inst.subproc, port = inst.meta.port]() { + inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { // read stdout/stderr and forward to main server log { - FILE * p_stdout_stderr = subprocess_stdout(child_proc); + FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); if (!p_stdout_stderr) { return; } @@ -278,7 +275,7 @@ void server_models::load(const std::string & name) { } // we reach here when the child process exits int exit_code = 0; - subprocess_join(child_proc, &exit_code); + subprocess_join(child_proc.get(), &exit_code); // update PID and status { std::lock_guard lk(mutex); @@ -308,7 +305,7 @@ void server_models::unload(const std::string & name) { if (it != mapping.end()) { if (it->second.meta.status == SERVER_MODEL_STATUS_LOADED) { SRV_INF("unloading model instance name=%s\n", name.c_str()); - subprocess_destroy(&it->second.subproc); + subprocess_destroy(it->second.subproc.get()); // status change will be handled by the managing thread } else { SRV_WRN("model instance name=%s is not loaded\n", name.c_str()); @@ -323,10 +320,11 @@ void server_models::unload_all() { for (auto & [name, inst] : mapping) { if (inst.meta.status == SERVER_MODEL_STATUS_LOADED) { SRV_INF("unloading model instance name=%s\n", name.c_str()); - subprocess_destroy(&inst.subproc); + subprocess_destroy(inst.subproc.get()); // status change will be handled by the managing thread - to_join.push_back(std::move(inst.th)); } + // moving the thread to join list to avoid deadlock + to_join.push_back(std::move(inst.th)); } } for (auto & th : to_join) { @@ -361,9 +359,10 @@ void server_models::ensure_model_loaded(const std::string & name) { if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } - if (meta->status == SERVER_MODEL_STATUS_LOADED) { + if (meta->status == SERVER_MODEL_STATUS_LOADED || meta->status == SERVER_MODEL_STATUS_LOADING) { return; // already loaded } + SRV_INF("model name=%s is not loaded, loading...\n", name.c_str()); load(name); wait_until_loaded(name); } diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 00c4e88da8..f5080a84ca 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -9,6 +9,7 @@ #include #include #include +#include enum server_model_status { SERVER_MODEL_STATUS_UNLOADED, @@ -53,9 +54,8 @@ struct server_model_meta { struct server_models { private: struct instance_t { - subprocess_s subproc; + std::shared_ptr subproc; // shared between main thread and monitoring thread std::thread th; - std::thread th_log; // logging thread server_model_meta meta; }; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 2c8bb595b1..37ef9d96ea 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5816,7 +5816,9 @@ int main(int argc, char ** argv, char ** envp) { if (is_router_server) { LOG_INF("%s: router server is listening on %s\n", __func__, ctx_http.listening_address.c_str()); ctx_http.is_ready.store(true); - ctx_http.thread.join(); // keep the main thread alive + if (ctx_http.thread.joinable()) { + ctx_http.thread.join(); // keep the main thread alive + } // when the HTTP server stops, clean up and exit clean_up(); From d0ea9e0830e96179500e188abf710dbe56343014 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 20 Nov 2025 16:20:14 +0100 Subject: [PATCH 3/4] also allow terminate loading model --- tools/server/server-models.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 9c03d5ecb7..54ac5b0de8 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -303,7 +303,7 @@ void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { - if (it->second.meta.status == SERVER_MODEL_STATUS_LOADED) { + if (it->second.meta.status == SERVER_MODEL_STATUS_LOADED || it->second.meta.status == SERVER_MODEL_STATUS_LOADING) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_destroy(it->second.subproc.get()); // status change will be handled by the managing thread @@ -318,7 +318,7 @@ void server_models::unload_all() { { std::lock_guard lk(mutex); for (auto & [name, inst] : mapping) { - if (inst.meta.status == SERVER_MODEL_STATUS_LOADED) { + if (inst.meta.status == SERVER_MODEL_STATUS_LOADED || inst.meta.status == SERVER_MODEL_STATUS_LOADING) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_destroy(inst.subproc.get()); // status change will be handled by the managing thread From 5805ca79602b4dc469d4be725d91484d6079ae82 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 20 Nov 2025 16:26:31 +0100 Subject: [PATCH 4/4] add is_active() --- tools/server/server-models.cpp | 6 +++--- tools/server/server-models.h | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 54ac5b0de8..be3226ada3 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -303,7 +303,7 @@ void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { - if (it->second.meta.status == SERVER_MODEL_STATUS_LOADED || it->second.meta.status == SERVER_MODEL_STATUS_LOADING) { + if (it->second.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_destroy(it->second.subproc.get()); // status change will be handled by the managing thread @@ -318,7 +318,7 @@ void server_models::unload_all() { { std::lock_guard lk(mutex); for (auto & [name, inst] : mapping) { - if (inst.meta.status == SERVER_MODEL_STATUS_LOADED || inst.meta.status == SERVER_MODEL_STATUS_LOADING) { + if (inst.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_destroy(inst.subproc.get()); // status change will be handled by the managing thread @@ -359,7 +359,7 @@ void server_models::ensure_model_loaded(const std::string & name) { if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } - if (meta->status == SERVER_MODEL_STATUS_LOADED || meta->status == SERVER_MODEL_STATUS_LOADING) { + if (meta->is_active()) { return; // already loaded } SRV_INF("model name=%s is not loaded, loading...\n", name.c_str()); diff --git a/tools/server/server-models.h b/tools/server/server-models.h index f5080a84ca..3cd070f89a 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -11,7 +11,16 @@ #include #include +/** + * state diagram: + * + * UNLOADED ──► LOADING ──► LOADED + * ▲ │ + * │ │ + * FAILED ◄───────┘ + */ enum server_model_status { + // TODO: also add downloading state SERVER_MODEL_STATUS_UNLOADED, SERVER_MODEL_STATUS_LOADING, SERVER_MODEL_STATUS_LOADED, @@ -49,6 +58,9 @@ struct server_model_meta { bool in_cache = false; // if true, use -hf; use -m otherwise int port = 0; server_model_status status = SERVER_MODEL_STATUS_UNLOADED; + bool is_active() const { + return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING; + } }; struct server_models {