Merge remote-tracking branch 'ngxson/xsn/server_model_management_v1_2' into allozaur/server_model_management_v1_2
This commit is contained in:
commit
c35dee3bd7
|
|
@ -91,9 +91,8 @@ server_models::server_models(
|
|||
/* status */ SERVER_MODEL_STATUS_UNLOADED
|
||||
};
|
||||
mapping[meta.name] = instance_t{
|
||||
/* subproc */ subprocess_s(),
|
||||
/* subproc */ std::make_shared<subprocess_s>(),
|
||||
/* th */ std::thread(),
|
||||
/* th_log */ std::thread(),
|
||||
/* meta */ meta
|
||||
};
|
||||
}
|
||||
|
|
@ -220,7 +219,7 @@ void server_models::load(const std::string & name) {
|
|||
inst.meta.port = get_free_port();
|
||||
inst.meta.status = SERVER_MODEL_STATUS_LOADING;
|
||||
|
||||
subprocess_s child_proc;
|
||||
inst.subproc = std::make_shared<subprocess_s>();
|
||||
{
|
||||
std::string exec_path = get_server_exec_path().string();
|
||||
SRV_INF("spawning server instance with name=%s on port %d\n", inst.meta.name.c_str(), inst.meta.port);
|
||||
|
|
@ -255,18 +254,28 @@ void server_models::load(const std::string & name) {
|
|||
std::vector<char *> envp = to_char_ptr_array(child_env);
|
||||
|
||||
int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
|
||||
int result = subprocess_create_ex(argv.data(), options, envp.data(), &child_proc);
|
||||
int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
|
||||
if (result != 0) {
|
||||
throw std::runtime_error("failed to spawn server instance");
|
||||
}
|
||||
}
|
||||
|
||||
inst.subproc = std::move(child_proc);
|
||||
|
||||
// start a thread to manage the child process
|
||||
inst.th = std::thread([this, name, child_proc = &inst.subproc]() {
|
||||
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
|
||||
// read stdout/stderr and forward to main server log
|
||||
{
|
||||
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
|
||||
if (!p_stdout_stderr) {
|
||||
return;
|
||||
}
|
||||
char buffer[4096];
|
||||
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
||||
LOG("[%5d] %s", port, buffer);
|
||||
}
|
||||
}
|
||||
// we reach here when the child process exits
|
||||
int exit_code = 0;
|
||||
subprocess_join(child_proc, &exit_code);
|
||||
subprocess_join(child_proc.get(), &exit_code);
|
||||
// update PID and status
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
|
|
@ -280,23 +289,10 @@ void server_models::load(const std::string & name) {
|
|||
}
|
||||
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
||||
});
|
||||
if (inst.th.joinable()) {
|
||||
inst.th.detach(); // TODO: remove this because it makes joining impossible
|
||||
}
|
||||
|
||||
// start a logging thread to read stdout/stderr
|
||||
inst.th_log = std::thread([child_proc = &inst.subproc, port = inst.meta.port]() {
|
||||
FILE * p_stdout_stderr = subprocess_stdout(child_proc);
|
||||
if (!p_stdout_stderr) {
|
||||
return;
|
||||
}
|
||||
char buffer[4096];
|
||||
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
||||
LOG("[%5d] %s", port, buffer);
|
||||
}
|
||||
});
|
||||
if (inst.th_log.joinable()) {
|
||||
inst.th_log.detach();
|
||||
// clean up old thread if exists
|
||||
if (mapping[name].th.joinable()) {
|
||||
mapping[name].th.join();
|
||||
}
|
||||
|
||||
mapping[name] = std::move(inst);
|
||||
|
|
@ -307,9 +303,9 @@ void server_models::unload(const std::string & name) {
|
|||
std::lock_guard<std::mutex> lk(mutex);
|
||||
auto it = mapping.find(name);
|
||||
if (it != mapping.end()) {
|
||||
if (it->second.meta.status == SERVER_MODEL_STATUS_LOADED) {
|
||||
if (it->second.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
subprocess_destroy(&it->second.subproc);
|
||||
subprocess_destroy(it->second.subproc.get());
|
||||
// status change will be handled by the managing thread
|
||||
} else {
|
||||
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
|
||||
|
|
@ -322,12 +318,13 @@ void server_models::unload_all() {
|
|||
{
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
for (auto & [name, inst] : mapping) {
|
||||
if (inst.meta.status == SERVER_MODEL_STATUS_LOADED) {
|
||||
if (inst.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
subprocess_destroy(&inst.subproc);
|
||||
subprocess_destroy(inst.subproc.get());
|
||||
// status change will be handled by the managing thread
|
||||
to_join.push_back(std::move(inst.th));
|
||||
}
|
||||
// moving the thread to join list to avoid deadlock
|
||||
to_join.push_back(std::move(inst.th));
|
||||
}
|
||||
}
|
||||
for (auto & th : to_join) {
|
||||
|
|
@ -362,9 +359,10 @@ void server_models::ensure_model_loaded(const std::string & name) {
|
|||
if (!meta.has_value()) {
|
||||
throw std::runtime_error("model name=" + name + " is not found");
|
||||
}
|
||||
if (meta->status == SERVER_MODEL_STATUS_LOADED) {
|
||||
if (meta->is_active()) {
|
||||
return; // already loaded
|
||||
}
|
||||
SRV_INF("model name=%s is not loaded, loading...\n", name.c_str());
|
||||
load(name);
|
||||
wait_until_loaded(name);
|
||||
}
|
||||
|
|
@ -387,15 +385,9 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
|
|||
return proxy;
|
||||
}
|
||||
|
||||
void server_models::notify_router_server_ready(const std::string & host, const std::string & name) {
|
||||
void server_models::setup_child_server(const std::string & host, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
|
||||
// send a notification to the router server that a model instance is ready
|
||||
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
||||
if (router_port == nullptr) {
|
||||
// no router server to notify, this is a standalone server
|
||||
return;
|
||||
}
|
||||
|
||||
httplib::Client cli(host, std::atoi(router_port));
|
||||
httplib::Client cli(host, router_port);
|
||||
cli.set_connection_timeout(0, 200000); // 200 milliseconds
|
||||
|
||||
httplib::Request req;
|
||||
|
|
@ -408,13 +400,28 @@ void server_models::notify_router_server_ready(const std::string & host, const s
|
|||
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
|
||||
req.body = body.dump();
|
||||
|
||||
SRV_INF("notifying router server (port=%s) that model %s is ready\n", router_port, name.c_str());
|
||||
SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
|
||||
auto result = cli.send(std::move(req));
|
||||
if (result.error() != httplib::Error::Success) {
|
||||
auto err_str = httplib::to_string(result.error());
|
||||
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
|
||||
// TODO: maybe force shutdown here?
|
||||
}
|
||||
|
||||
// setup thread for monitoring stdin
|
||||
// when EOF is detected, that means the router server requested shutdown, or the parent process died
|
||||
std::thread([shutdown_handler]() {
|
||||
// wait for EOF on stdin
|
||||
SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n");
|
||||
while (true) {
|
||||
int c = getchar();
|
||||
if (c == EOF) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
SRV_INF("%s", "EOF on stdin detected, invoking shutdown handler...\n");
|
||||
shutdown_handler(0); // invoke shutdown handler
|
||||
}).detach();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -8,8 +8,19 @@
|
|||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
/**
|
||||
* state diagram:
|
||||
*
|
||||
* UNLOADED ──► LOADING ──► LOADED
|
||||
* ▲ │
|
||||
* │ │
|
||||
* FAILED ◄───────┘
|
||||
*/
|
||||
enum server_model_status {
|
||||
// TODO: also add downloading state
|
||||
SERVER_MODEL_STATUS_UNLOADED,
|
||||
SERVER_MODEL_STATUS_LOADING,
|
||||
SERVER_MODEL_STATUS_LOADED,
|
||||
|
|
@ -47,14 +58,16 @@ struct server_model_meta {
|
|||
bool in_cache = false; // if true, use -hf; use -m otherwise
|
||||
int port = 0;
|
||||
server_model_status status = SERVER_MODEL_STATUS_UNLOADED;
|
||||
bool is_active() const {
|
||||
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
|
||||
}
|
||||
};
|
||||
|
||||
struct server_models {
|
||||
private:
|
||||
struct instance_t {
|
||||
subprocess_s subproc;
|
||||
std::shared_ptr<subprocess_s> subproc; // shared between main thread and monitoring thread
|
||||
std::thread th;
|
||||
std::thread th_log; // logging thread
|
||||
server_model_meta meta;
|
||||
};
|
||||
|
||||
|
|
@ -98,7 +111,7 @@ public:
|
|||
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name);
|
||||
|
||||
// notify the router server that a model instance is ready
|
||||
static void notify_router_server_ready(const std::string & host, const std::string & name);
|
||||
static void setup_child_server(const std::string & host, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler);
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -5816,7 +5816,9 @@ int main(int argc, char ** argv, char ** envp) {
|
|||
if (is_router_server) {
|
||||
LOG_INF("%s: router server is listening on %s\n", __func__, ctx_http.listening_address.c_str());
|
||||
ctx_http.is_ready.store(true);
|
||||
ctx_http.thread.join(); // keep the main thread alive
|
||||
if (ctx_http.thread.joinable()) {
|
||||
ctx_http.thread.join(); // keep the main thread alive
|
||||
}
|
||||
|
||||
// when the HTTP server stops, clean up and exit
|
||||
clean_up();
|
||||
|
|
@ -5825,7 +5827,10 @@ int main(int argc, char ** argv, char ** envp) {
|
|||
LOG_INF("%s: starting the main loop...\n", __func__);
|
||||
|
||||
// optionally, notify router server that this instance is ready
|
||||
server_models::notify_router_server_ready(params.hostname, params.model_alias);
|
||||
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
|
||||
if (router_port != nullptr) {
|
||||
server_models::setup_child_server(params.hostname, std::atoi(router_port), params.model_alias, shutdown_handler);
|
||||
}
|
||||
|
||||
// this call blocks the main thread until queue_tasks.terminate() is called
|
||||
ctx_server.queue_tasks.start_loop();
|
||||
|
|
|
|||
Loading…
Reference in New Issue