This commit is contained in:
Xuan-Son Nguyen 2025-12-17 05:51:06 +02:00 committed by GitHub
commit e5902bd7dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 44 deletions

View File

@ -17,6 +17,7 @@
#include <chrono> #include <chrono>
#include <queue> #include <queue>
#include <filesystem> #include <filesystem>
#include <string.h>
#ifdef _WIN32 #ifdef _WIN32
#include <winsock2.h> #include <winsock2.h>
@ -33,7 +34,8 @@
#include <limits.h> #include <limits.h>
#endif #endif
#define CMD_EXIT "exit" #define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
// address for child process, this is needed because router may run on 0.0.0.0 // address for child process, this is needed because router may run on 0.0.0.0
// ref: https://github.com/ggml-org/llama.cpp/issues/17862 // ref: https://github.com/ggml-org/llama.cpp/issues/17862
@ -534,6 +536,8 @@ void server_models::load(const std::string & name) {
std::vector<char *> argv = to_char_ptr_array(child_args); std::vector<char *> argv = to_char_ptr_array(child_args);
std::vector<char *> envp = to_char_ptr_array(child_env); std::vector<char *> envp = to_char_ptr_array(child_env);
// TODO @ngxson : maybe separate stdout and stderr in the future
// so that we can use stdout for commands and stderr for logging
int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr; int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get()); int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
if (result != 0) { if (result != 0) {
@ -547,11 +551,17 @@ void server_models::load(const std::string & name) {
// captured variables are guaranteed to be destroyed only after the thread is joined // captured variables are guaranteed to be destroyed only after the thread is joined
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
// read stdout/stderr and forward to main server log // read stdout/stderr and forward to main server log
bool state_received = false; // true if child state received
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
if (p_stdout_stderr) { if (p_stdout_stderr) {
char buffer[4096]; char buffer[4096];
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
LOG("[%5d] %s", port, buffer); LOG("[%5d] %s", port, buffer);
if (!state_received && strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
// child process is ready
this->update_status(name, SERVER_MODEL_STATUS_LOADED);
state_received = true;
}
} }
} else { } else {
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
@ -595,7 +605,7 @@ static void interrupt_subprocess(FILE * stdin_file) {
// because subprocess.h does not provide a way to send SIGINT, // because subprocess.h does not provide a way to send SIGINT,
// we will send a command to the child process to exit gracefully // we will send a command to the child process to exit gracefully
if (stdin_file) { if (stdin_file) {
fprintf(stdin_file, "%s\n", CMD_EXIT); fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file); fflush(stdin_file);
} }
} }
@ -707,32 +717,13 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
return proxy; return proxy;
} }
std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) { std::thread server_models::setup_child_server(std::function<void(int)> & shutdown_handler) {
// send a notification to the router server that a model instance is ready // send a notification to the router server that a model instance is ready
// TODO @ngxson : use HTTP client from libcommon common_log_pause(common_log_main());
httplib::Client cli(base_params.hostname, router_port); fflush(stdout);
cli.set_connection_timeout(0, 200000); // 200 milliseconds fprintf(stdout, "%s\n", CMD_CHILD_TO_ROUTER_READY);
fflush(stdout);
httplib::Request req; common_log_resume(common_log_main());
req.method = "POST";
req.path = "/models/status";
req.set_header("Content-Type", "application/json");
if (!base_params.api_keys.empty()) {
req.set_header("Authorization", "Bearer " + base_params.api_keys[0]);
}
json body;
body["model"] = name;
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
req.body = body.dump();
SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
auto result = cli.send(std::move(req));
if (result.error() != httplib::Error::Success) {
auto err_str = httplib::to_string(result.error());
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
exit(1); // force exit
}
// setup thread for monitoring stdin // setup thread for monitoring stdin
return std::thread([shutdown_handler]() { return std::thread([shutdown_handler]() {
@ -746,7 +737,7 @@ std::thread server_models::setup_child_server(const common_params & base_params,
eof = true; eof = true;
break; break;
} }
if (line.find(CMD_EXIT) != std::string::npos) { if (line.find(CMD_ROUTER_TO_CHILD_EXIT) != std::string::npos) {
SRV_INF("%s", "exit command received, exiting...\n"); SRV_INF("%s", "exit command received, exiting...\n");
shutdown_handler(0); shutdown_handler(0);
break; break;
@ -869,18 +860,6 @@ void server_models_routes::init_routes() {
return res; return res;
}; };
// used by child process to notify the router about status change
// TODO @ngxson : maybe implement authentication for this endpoint in the future
this->post_router_models_status = [this](const server_http_req & req) {
auto res = std::make_unique<server_http_res>();
json body = json::parse(req.body);
std::string model = json_value(body, "model", std::string());
std::string value = json_value(body, "value", std::string());
models.update_status(model, server_model_status_from_string(value));
res_ok(res, {{"success", true}});
return res;
};
this->get_router_models = [this](const server_http_req &) { this->get_router_models = [this](const server_http_req &) {
auto res = std::make_unique<server_http_res>(); auto res = std::make_unique<server_http_res>();
json models_json = json::array(); json models_json = json::array();

View File

@ -144,7 +144,7 @@ public:
// notify the router server that a model instance is ready // notify the router server that a model instance is ready
// return the monitoring thread (to be joined by the caller) // return the monitoring thread (to be joined by the caller)
static std::thread setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler); static std::thread setup_child_server(std::function<void(int)> & shutdown_handler);
}; };
struct server_models_routes { struct server_models_routes {
@ -162,7 +162,6 @@ struct server_models_routes {
server_http_context::handler_t proxy_post; server_http_context::handler_t proxy_post;
server_http_context::handler_t get_router_models; server_http_context::handler_t get_router_models;
server_http_context::handler_t post_router_models_load; server_http_context::handler_t post_router_models_load;
server_http_context::handler_t post_router_models_status;
server_http_context::handler_t post_router_models_unload; server_http_context::handler_t post_router_models_unload;
}; };

View File

@ -153,7 +153,6 @@ int main(int argc, char ** argv, char ** envp) {
routes.get_models = models_routes->get_router_models; routes.get_models = models_routes->get_router_models;
ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load)); ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load));
ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload)); ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload));
ctx_http.post("/models/status", ex_wrapper(models_routes->post_router_models_status));
} }
ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check) ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
@ -291,7 +290,7 @@ int main(int argc, char ** argv, char ** envp) {
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
std::thread monitor_thread; std::thread monitor_thread;
if (router_port != nullptr) { if (router_port != nullptr) {
monitor_thread = server_models::setup_child_server(params, std::atoi(router_port), params.model_alias, shutdown_handler); monitor_thread = server_models::setup_child_server(shutdown_handler);
} }
// this call blocks the main thread until queue_tasks.terminate() is called // this call blocks the main thread until queue_tasks.terminate() is called