diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 3690c0bb82..09c0c28772 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #ifdef _WIN32 #include @@ -33,7 +34,8 @@ #include #endif -#define CMD_EXIT "exit" +#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit" +#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" // address for child process, this is needed because router may run on 0.0.0.0 // ref: https://github.com/ggml-org/llama.cpp/issues/17862 @@ -534,6 +536,8 @@ void server_models::load(const std::string & name) { std::vector argv = to_char_ptr_array(child_args); std::vector envp = to_char_ptr_array(child_env); + // TODO @ngxson : maybe separate stdout and stderr in the future + // so that we can use stdout for commands and stderr for logging int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr; int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get()); if (result != 0) { @@ -547,11 +551,17 @@ void server_models::load(const std::string & name) { // captured variables are guaranteed to be destroyed only after the thread is joined inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { // read stdout/stderr and forward to main server log + bool state_received = false; // true if child state received FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); if (p_stdout_stderr) { char buffer[4096]; while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { LOG("[%5d] %s", port, buffer); + if (!state_received && strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) { + // child process is ready + this->update_status(name, SERVER_MODEL_STATUS_LOADED); + state_received = true; + } } } else { SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); @@ -595,7 +605,7 @@ static void interrupt_subprocess(FILE * stdin_file) { // because subprocess.h does not provide a way to send SIGINT, // we will send a command to the child process to exit gracefully if (stdin_file) { - fprintf(stdin_file, "%s\n", CMD_EXIT); + fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT); fflush(stdin_file); } } @@ -707,32 +717,13 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co return proxy; } -std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function & shutdown_handler) { +std::thread server_models::setup_child_server(std::function & shutdown_handler) { // send a notification to the router server that a model instance is ready - // TODO @ngxson : use HTTP client from libcommon - httplib::Client cli(base_params.hostname, router_port); - cli.set_connection_timeout(0, 200000); // 200 milliseconds - - httplib::Request req; - req.method = "POST"; - req.path = "/models/status"; - req.set_header("Content-Type", "application/json"); - if (!base_params.api_keys.empty()) { - req.set_header("Authorization", "Bearer " + base_params.api_keys[0]); - } - - json body; - body["model"] = name; - body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED); - req.body = body.dump(); - - SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str()); - auto result = cli.send(std::move(req)); - if (result.error() != httplib::Error::Success) { - auto err_str = httplib::to_string(result.error()); - SRV_ERR("failed to notify router server: %s\n", err_str.c_str()); - exit(1); // force exit - } + common_log_pause(common_log_main()); + fflush(stdout); + fprintf(stdout, "%s\n", CMD_CHILD_TO_ROUTER_READY); + fflush(stdout); + common_log_resume(common_log_main()); // setup thread for monitoring stdin return std::thread([shutdown_handler]() { @@ -746,7 +737,7 @@ std::thread server_models::setup_child_server(const common_params & base_params, eof = true; break; } - if (line.find(CMD_EXIT) != std::string::npos) { + if (line.find(CMD_ROUTER_TO_CHILD_EXIT) != std::string::npos) { SRV_INF("%s", "exit command received, exiting...\n"); shutdown_handler(0); break; @@ -869,18 +860,6 @@ void server_models_routes::init_routes() { return res; }; - // used by child process to notify the router about status change - // TODO @ngxson : maybe implement authentication for this endpoint in the future - this->post_router_models_status = [this](const server_http_req & req) { - auto res = std::make_unique(); - json body = json::parse(req.body); - std::string model = json_value(body, "model", std::string()); - std::string value = json_value(body, "value", std::string()); - models.update_status(model, server_model_status_from_string(value)); - res_ok(res, {{"success", true}}); - return res; - }; - this->get_router_models = [this](const server_http_req &) { auto res = std::make_unique(); json models_json = json::array(); diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 9cdbbad9b6..383df779aa 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -144,7 +144,7 @@ public: // notify the router server that a model instance is ready // return the monitoring thread (to be joined by the caller) - static std::thread setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function & shutdown_handler); + static std::thread setup_child_server(std::function & shutdown_handler); }; struct server_models_routes { @@ -162,7 +162,6 @@ struct server_models_routes { server_http_context::handler_t proxy_post; server_http_context::handler_t get_router_models; server_http_context::handler_t post_router_models_load; - server_http_context::handler_t post_router_models_status; server_http_context::handler_t post_router_models_unload; }; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 8538427f73..3cebe174b9 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -153,7 +153,6 @@ int main(int argc, char ** argv, char ** envp) { routes.get_models = models_routes->get_router_models; ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load)); ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload)); - ctx_http.post("/models/status", ex_wrapper(models_routes->post_router_models_status)); } ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check) @@ -291,7 +290,7 @@ int main(int argc, char ** argv, char ** envp) { const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT"); std::thread monitor_thread; if (router_port != nullptr) { - monitor_thread = server_models::setup_child_server(params, std::atoi(router_port), params.model_alias, shutdown_handler); + monitor_thread = server_models::setup_child_server(shutdown_handler); } // this call blocks the main thread until queue_tasks.terminate() is called