more stable
This commit is contained in:
parent
919d3f8cbf
commit
b9ebdf616a
|
|
@ -264,7 +264,19 @@ void server_models::load(const std::string & name) {
|
||||||
inst.subproc = std::move(child_proc);
|
inst.subproc = std::move(child_proc);
|
||||||
|
|
||||||
// start a thread to manage the child process
|
// start a thread to manage the child process
|
||||||
inst.th = std::thread([this, name, child_proc = &inst.subproc]() {
|
inst.th = std::thread([this, name, child_proc = &inst.subproc, port = inst.meta.port]() {
|
||||||
|
// read stdout/stderr and forward to main server log
|
||||||
|
{
|
||||||
|
FILE * p_stdout_stderr = subprocess_stdout(child_proc);
|
||||||
|
if (!p_stdout_stderr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
char buffer[4096];
|
||||||
|
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
||||||
|
LOG("[%5d] %s", port, buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// we reach here when the child process exits
|
||||||
int exit_code = 0;
|
int exit_code = 0;
|
||||||
subprocess_join(child_proc, &exit_code);
|
subprocess_join(child_proc, &exit_code);
|
||||||
// update PID and status
|
// update PID and status
|
||||||
|
|
@ -280,23 +292,10 @@ void server_models::load(const std::string & name) {
|
||||||
}
|
}
|
||||||
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
||||||
});
|
});
|
||||||
if (inst.th.joinable()) {
|
|
||||||
inst.th.detach(); // TODO: remove this because it makes joining impossible
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a logging thread to read stdout/stderr
|
// clean up old thread if exists
|
||||||
inst.th_log = std::thread([child_proc = &inst.subproc, port = inst.meta.port]() {
|
if (mapping[name].th.joinable()) {
|
||||||
FILE * p_stdout_stderr = subprocess_stdout(child_proc);
|
mapping[name].th.join();
|
||||||
if (!p_stdout_stderr) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
char buffer[4096];
|
|
||||||
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
|
||||||
LOG("[%5d] %s", port, buffer);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (inst.th_log.joinable()) {
|
|
||||||
inst.th_log.detach();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mapping[name] = std::move(inst);
|
mapping[name] = std::move(inst);
|
||||||
|
|
@ -387,15 +386,9 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
void server_models::notify_router_server_ready(const std::string & host, const std::string & name) {
|
void server_models::setup_child_server(const std::string & host, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
|
||||||
// send a notification to the router server that a model instance is ready
|
// send a notification to the router server that a model instance is ready
|
||||||
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
httplib::Client cli(host, router_port);
|
||||||
if (router_port == nullptr) {
|
|
||||||
// no router server to notify, this is a standalone server
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
httplib::Client cli(host, std::atoi(router_port));
|
|
||||||
cli.set_connection_timeout(0, 200000); // 200 milliseconds
|
cli.set_connection_timeout(0, 200000); // 200 milliseconds
|
||||||
|
|
||||||
httplib::Request req;
|
httplib::Request req;
|
||||||
|
|
@ -408,13 +401,28 @@ void server_models::notify_router_server_ready(const std::string & host, const s
|
||||||
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
|
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
|
||||||
req.body = body.dump();
|
req.body = body.dump();
|
||||||
|
|
||||||
SRV_INF("notifying router server (port=%s) that model %s is ready\n", router_port, name.c_str());
|
SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
|
||||||
auto result = cli.send(std::move(req));
|
auto result = cli.send(std::move(req));
|
||||||
if (result.error() != httplib::Error::Success) {
|
if (result.error() != httplib::Error::Success) {
|
||||||
auto err_str = httplib::to_string(result.error());
|
auto err_str = httplib::to_string(result.error());
|
||||||
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
|
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
|
||||||
// TODO: maybe force shutdown here?
|
// TODO: maybe force shutdown here?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setup thread for monitoring stdin
|
||||||
|
// when EOF is detected, that means the router server requested shutdown, or the parent process died
|
||||||
|
std::thread([shutdown_handler]() {
|
||||||
|
// wait for EOF on stdin
|
||||||
|
SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n");
|
||||||
|
while (true) {
|
||||||
|
int c = getchar();
|
||||||
|
if (c == EOF) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SRV_INF("%s", "EOF on stdin detected, invoking shutdown handler...\n");
|
||||||
|
shutdown_handler(0); // invoke shutdown handler
|
||||||
|
}).detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
enum server_model_status {
|
enum server_model_status {
|
||||||
SERVER_MODEL_STATUS_UNLOADED,
|
SERVER_MODEL_STATUS_UNLOADED,
|
||||||
|
|
@ -98,7 +99,7 @@ public:
|
||||||
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name);
|
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name);
|
||||||
|
|
||||||
// notify the router server that a model instance is ready
|
// notify the router server that a model instance is ready
|
||||||
static void notify_router_server_ready(const std::string & host, const std::string & name);
|
static void setup_child_server(const std::string & host, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -5825,7 +5825,10 @@ int main(int argc, char ** argv, char ** envp) {
|
||||||
LOG_INF("%s: starting the main loop...\n", __func__);
|
LOG_INF("%s: starting the main loop...\n", __func__);
|
||||||
|
|
||||||
// optionally, notify router server that this instance is ready
|
// optionally, notify router server that this instance is ready
|
||||||
server_models::notify_router_server_ready(params.hostname, params.model_alias);
|
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
||||||
|
if (router_port != nullptr) {
|
||||||
|
server_models::setup_child_server(params.hostname, std::atoi(router_port), params.model_alias, shutdown_handler);
|
||||||
|
}
|
||||||
|
|
||||||
// this call blocks the main thread until queue_tasks.terminate() is called
|
// this call blocks the main thread until queue_tasks.terminate() is called
|
||||||
ctx_server.queue_tasks.start_loop();
|
ctx_server.queue_tasks.start_loop();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue