#include "utils.hpp" #include "server-models.h" #include "download.h" #include #include #include #include #include #include #include #include #include #ifdef _WIN32 #include #else #include #include #include #include #endif #if defined(__APPLE__) && defined(__MACH__) // macOS: use _NSGetExecutablePath to get the executable path #include #include #endif static std::filesystem::path get_server_exec_path() { #if defined(_WIN32) wchar_t buf[32768] = { 0 }; // Large buffer to handle long paths DWORD len = GetModuleFileNameW(nullptr, buf, _countof(buf)); if (len == 0 || len >= _countof(buf)) { throw std::runtime_error("GetModuleFileNameW failed or path too long"); } return std::filesystem::path(buf); #elif defined(__APPLE__) && defined(__MACH__) char small_path[PATH_MAX]; uint32_t size = sizeof(small_path); if (_NSGetExecutablePath(small_path, &size) == 0) { // resolve any symlinks to get absolute path try { return std::filesystem::canonical(std::filesystem::path(small_path)); } catch (...) { return std::filesystem::path(small_path); } } else { // buffer was too small, allocate required size and call again std::vector buf(size); if (_NSGetExecutablePath(buf.data(), &size) == 0) { try { return std::filesystem::canonical(std::filesystem::path(buf.data())); } catch (...) { return std::filesystem::path(buf.data()); } } throw std::runtime_error("_NSGetExecutablePath failed after buffer resize"); } #else char path[FILENAME_MAX]; ssize_t count = readlink("/proc/self/exe", path, FILENAME_MAX); if (count <= 0) { throw std::runtime_error("failed to resolve /proc/self/exe"); } return std::filesystem::path(std::string(path, count)); #endif } struct local_model { std::string name; std::string path; std::string path_mmproj; }; static std::vector list_local_models(const std::string & dir) { if (!std::filesystem::exists(dir) || !std::filesystem::is_directory(dir)) { throw std::runtime_error(string_format("error: '%s' does not exist or is not a directory\n", dir.c_str())); } std::vector models; auto scan_subdir = [&models](const std::string & subdir_path, const std::string name) { auto files = fs_list(subdir_path, false); common_file_info model_file; common_file_info first_shard_file; common_file_info mmproj_file; for (const auto & file : files) { if (string_ends_with(file.name, ".gguf")) { if (file.name.find("mmproj") != std::string::npos) { mmproj_file = file; } else if (file.name.find("-00001-of-") != std::string::npos) { first_shard_file = file; } else { model_file = file; } } } // single file model local_model model{ /* name */ name, /* path */ first_shard_file.path.empty() ? model_file.path : first_shard_file.path, /* path_mmproj */ mmproj_file.path // can be empty }; if (!model.path.empty()) { models.push_back(model); } }; auto files = fs_list(dir, true); for (const auto & file : files) { if (file.is_dir) { scan_subdir(file.path, file.name); } else if (string_ends_with(file.name, ".gguf")) { // single file model std::string name = file.name; string_replace_all(name, ".gguf", ""); local_model model{ /* name */ name, /* path */ file.path, /* path_mmproj */ "" }; models.push_back(model); } } return models; } // // server_models // server_models::server_models( const common_params & params, int argc, char ** argv, char ** envp) : base_params(params) { for (int i = 0; i < argc; i++) { base_args.push_back(std::string(argv[i])); } for (char ** env = envp; *env != nullptr; env++) { base_env.push_back(std::string(*env)); } // TODO: allow refreshing cached model list // add cached models auto cached_models = common_list_cached_models(); for (const auto & model : cached_models) { server_model_meta meta{ /* name */ model.to_string(), /* path */ model.manifest_path, /* path_mmproj */ "", // auto-detected when loading /* in_cache */ true, /* port */ 0, /* status */ SERVER_MODEL_STATUS_UNLOADED, /* last_used */ 0, /* args */ std::vector(), /* exit_code */ 0 }; mapping[meta.name] = instance_t{ /* subproc */ std::make_shared(), /* th */ std::thread(), /* meta */ meta }; } // add local models specificed via --models-dir if (!params.models_dir.empty()) { auto local_models = list_local_models(params.models_dir); for (const auto & model : local_models) { if (mapping.find(model.name) != mapping.end()) { // already exists in cached models, skip continue; } server_model_meta meta{ /* name */ model.name, /* path */ model.path, /* path_mmproj */ model.path_mmproj, /* in_cache */ false, /* port */ 0, /* status */ SERVER_MODEL_STATUS_UNLOADED, /* last_used */ 0, /* args */ std::vector(), /* exit_code */ 0 }; mapping[meta.name] = instance_t{ /* subproc */ std::make_shared(), /* th */ std::thread(), /* meta */ meta }; } } } void server_models::update_meta(const std::string & name, const server_model_meta & meta) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { it->second.meta = meta; } cv.notify_all(); // notify wait_until_loaded } bool server_models::has_model(const std::string & name) { std::lock_guard lk(mutex); return mapping.find(name) != mapping.end(); } std::optional server_models::get_meta(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { return it->second.meta; } return std::nullopt; } static int get_free_port() { #ifdef _WIN32 WSADATA wsaData; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { return -1; } typedef SOCKET native_socket_t; #define INVALID_SOCKET_VAL INVALID_SOCKET #define CLOSE_SOCKET(s) closesocket(s) #else typedef int native_socket_t; #define INVALID_SOCKET_VAL -1 #define CLOSE_SOCKET(s) close(s) #endif native_socket_t sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == INVALID_SOCKET_VAL) { #ifdef _WIN32 WSACleanup(); #endif return -1; } struct sockaddr_in serv_addr; std::memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port = htons(0); if (bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) != 0) { CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return -1; } #ifdef _WIN32 int namelen = sizeof(serv_addr); #else socklen_t namelen = sizeof(serv_addr); #endif if (getsockname(sock, (struct sockaddr*)&serv_addr, &namelen) != 0) { CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return -1; } int port = ntohs(serv_addr.sin_port); CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return port; } // helper to convert vector to char ** // pointers are only valid as long as the original vector is valid static std::vector to_char_ptr_array(const std::vector & vec) { std::vector result; result.reserve(vec.size() + 1); for (const auto & s : vec) { result.push_back(const_cast(s.c_str())); } result.push_back(nullptr); return result; } std::vector server_models::get_all_meta() { std::lock_guard lk(mutex); std::vector result; for (const auto & [name, inst] : mapping) { result.push_back(inst.meta); } return result; } void server_models::unload_lru() { if (base_params.max_models <= 0) { return; // no limit } // remove one of the servers if we passed the max_models (least recently used - LRU) std::string lru_model_name = ""; int64_t lru_last_used = ggml_time_ms(); size_t count_active = 0; { std::lock_guard lk(mutex); for (const auto & m : mapping) { if (m.second.meta.is_active()) { count_active++; if (m.second.meta.last_used < lru_last_used) { lru_model_name = m.first; lru_last_used = m.second.meta.last_used; } } } } if (!lru_model_name.empty() && count_active >= (size_t)base_params.max_models) { SRV_INF("max_models limit reached, removing LRU name=%s\n", lru_model_name.c_str()); unload(lru_model_name); } } void server_models::load(const std::string & name, const std::vector & extra_args) { if (!has_model(name)) { throw std::runtime_error("model name=" + name + " is not found"); } unload_lru(); std::lock_guard lk(mutex); auto meta = mapping[name].meta; if (meta.status != SERVER_MODEL_STATUS_FAILED && meta.status != SERVER_MODEL_STATUS_UNLOADED) { SRV_INF("model %s is not ready\n", name.c_str()); return; } // prepare new instance info instance_t inst; inst.meta = meta; inst.meta.port = get_free_port(); inst.meta.status = SERVER_MODEL_STATUS_LOADING; inst.meta.last_used = ggml_time_ms(); if (inst.meta.port <= 0) { throw std::runtime_error("failed to get a port number"); } inst.subproc = std::make_shared(); { std::string exec_path = get_server_exec_path().string(); SRV_INF("spawning server instance with name=%s on port %d\n", inst.meta.name.c_str(), inst.meta.port); std::vector child_args = base_args; // copy if (inst.meta.in_cache) { child_args.push_back("-hf"); child_args.push_back(inst.meta.name); } else { child_args.push_back("-m"); child_args.push_back(inst.meta.path); if (!inst.meta.path_mmproj.empty()) { child_args.push_back("--mmproj"); child_args.push_back(inst.meta.path_mmproj); } } child_args.push_back("--alias"); child_args.push_back(inst.meta.name); child_args.push_back("--port"); child_args.push_back(std::to_string(inst.meta.port)); // append extra args for (const auto & arg : extra_args) { child_args.push_back(arg); } std::vector child_env = base_env; // copy child_env.push_back("LLAMA_SERVER_ROUTER_PORT=" + std::to_string(base_params.port)); SRV_INF("%s", "spawning server instance with args:\n"); for (const auto & arg : child_args) { SRV_INF(" %s\n", arg.c_str()); } inst.meta.args = child_args; // save for debugging std::vector argv = to_char_ptr_array(child_args); std::vector envp = to_char_ptr_array(child_env); int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr; int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get()); if (result != 0) { throw std::runtime_error("failed to spawn server instance"); } } // start a thread to manage the child process inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { // read stdout/stderr and forward to main server log FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); if (p_stdout_stderr) { char buffer[4096]; while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { LOG("[%5d] %s", port, buffer); } } else { SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); } // we reach here when the child process exits int exit_code = 0; subprocess_join(child_proc.get(), &exit_code); subprocess_destroy(child_proc.get()); // update PID and status { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { auto & meta = it->second.meta; meta.exit_code = exit_code; meta.status = exit_code == 0 ? SERVER_MODEL_STATUS_UNLOADED : SERVER_MODEL_STATUS_FAILED; } cv.notify_all(); } SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code); }); // clean up old thread if exists if (mapping[name].th.joinable()) { mapping[name].th.join(); } mapping[name] = std::move(inst); cv.notify_all(); } void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { if (it->second.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_terminate(it->second.subproc.get()); // status change will be handled by the managing thread } else { SRV_WRN("model instance name=%s is not loaded\n", name.c_str()); } } } void server_models::unload_all() { std::vector to_join; { std::lock_guard lk(mutex); for (auto & [name, inst] : mapping) { if (inst.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); subprocess_terminate(inst.subproc.get()); // status change will be handled by the managing thread } // moving the thread to join list to avoid deadlock to_join.push_back(std::move(inst.th)); } } for (auto & th : to_join) { if (th.joinable()) { th.join(); } } } void server_models::update_status(const std::string & name, server_model_status status) { // for now, we only allow updating to LOADED status if (status != SERVER_MODEL_STATUS_LOADED) { throw std::runtime_error("invalid status value"); } auto meta = get_meta(name); if (meta.has_value()) { meta->status = status; update_meta(name, meta.value()); } } void server_models::wait_until_loaded(const std::string & name) { std::unique_lock lk(mutex); cv.wait(lk, [this, &name]() { auto it = mapping.find(name); if (it != mapping.end()) { return it->second.meta.status == SERVER_MODEL_STATUS_LOADED || it->second.meta.status == SERVER_MODEL_STATUS_FAILED; } return false; }); } bool server_models::ensure_model_loaded(const std::string & name) { auto meta = get_meta(name); if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } if (meta->is_active()) { return false; // already loaded } SRV_INF("model name=%s is not loaded, loading...\n", name.c_str()); load(name, {}); wait_until_loaded(name); { // check final status meta = get_meta(name); if (!meta.has_value() || meta->status == SERVER_MODEL_STATUS_FAILED) { throw std::runtime_error("model name=" + name + " failed to load"); } } return true; } server_http_res_ptr server_models::proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used) { auto meta = get_meta(name); if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } if (ensure_model_loaded(name)) { meta = get_meta(name); // refresh meta } if (update_last_used) { std::unique_lock lk(mutex); mapping[name].meta.last_used = ggml_time_ms(); } SRV_INF("proxying request to model %s on port %d\n", name.c_str(), meta->port); auto proxy = std::make_unique( method, base_params.hostname, meta->port, req.path, req.headers, req.body, req.should_stop); return proxy; } void server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function & shutdown_handler) { // send a notification to the router server that a model instance is ready httplib::Client cli(base_params.hostname, router_port); cli.set_connection_timeout(0, 200000); // 200 milliseconds httplib::Request req; req.method = "POST"; req.path = "/models/status"; req.set_header("Content-Type", "application/json"); if (!base_params.api_keys.empty()) { req.set_header("Authorization", "Bearer " + base_params.api_keys[0]); } json body; body["model"] = name; body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED); req.body = body.dump(); SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str()); auto result = cli.send(std::move(req)); if (result.error() != httplib::Error::Success) { auto err_str = httplib::to_string(result.error()); SRV_ERR("failed to notify router server: %s\n", err_str.c_str()); // TODO: maybe force shutdown here? } // setup thread for monitoring stdin // when EOF is detected, that means the router server requested shutdown, or the parent process died std::thread([shutdown_handler]() { // wait for EOF on stdin SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n"); while (true) { int c = getchar(); if (c == EOF) { break; } } SRV_INF("%s", "EOF on stdin detected, invoking shutdown handler...\n"); shutdown_handler(0); // invoke shutdown handler }).detach(); } // // server_http_proxy // // simple implementation of a pipe // used for streaming data between threads template struct pipe_t { std::mutex mutex; std::condition_variable cv; std::queue queue; std::atomic writer_closed{false}; std::atomic reader_closed{false}; void close_write() { writer_closed.store(true, std::memory_order_relaxed); cv.notify_all(); } void close_read() { reader_closed.store(true, std::memory_order_relaxed); cv.notify_all(); } bool read(T & output, const std::function & should_stop) { std::unique_lock lk(mutex); constexpr auto poll_interval = std::chrono::milliseconds(500); while (true) { if (!queue.empty()) { output = std::move(queue.front()); queue.pop(); return true; } if (writer_closed.load()) { return false; // clean EOF } if (should_stop()) { close_read(); // signal broken pipe to writer return false; // cancelled / reader no longer alive } cv.wait_for(lk, poll_interval); } } bool write(T && data) { std::lock_guard lk(mutex); if (reader_closed.load()) { return false; // broken pipe } queue.push(std::move(data)); cv.notify_one(); return true; } }; server_http_proxy::server_http_proxy( const std::string & method, const std::string & host, int port, const std::string & path, const std::map & headers, const std::string & body, const std::function should_stop) { // shared between reader and writer threads auto cli = std::make_shared(host, port); auto pipe = std::make_shared>(); // setup Client cli->set_connection_timeout(0, 200000); // 200 milliseconds this->status = 500; // to be overwritten upon response this->cleanup = [pipe]() { pipe->close_read(); pipe->close_write(); }; // wire up the receive end of the pipe this->next = [pipe, should_stop](std::string & out) -> bool { msg_t msg; bool has_next = pipe->read(msg, should_stop); if (!msg.data.empty()) { out = std::move(msg.data); } return has_next; // false if EOF or pipe broken }; // wire up the HTTP client // note: do NOT capture `this` pointer, as it may be destroyed before the thread ends httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) { msg_t msg; msg.status = response.status; for (const auto & [key, value] : response.headers) { msg.headers[key] = value; } return pipe->write(std::move(msg)); // send headers first }; httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) { // send data chunks // returns false if pipe is closed / broken (signal to stop receiving) return pipe->write({{}, 0, std::string(data, data_length)}); }; // prepare the request to destination server httplib::Request req; { req.method = method; req.path = path; for (const auto & [key, value] : headers) { req.set_header(key, value); } req.body = body; req.response_handler = response_handler; req.content_receiver = content_receiver; } // start the proxy thread SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str()); this->thread = std::thread([cli, pipe, req]() { auto result = cli->send(std::move(req)); if (result.error() != httplib::Error::Success) { auto err_str = httplib::to_string(result.error()); SRV_ERR("http client error: %s\n", err_str.c_str()); pipe->write({{}, 500, ""}); // header pipe->write({{}, 0, "proxy error: " + err_str}); // body } pipe->close_write(); // signal EOF to reader SRV_DBG("%s", "client request thread ended\n"); }); this->thread.detach(); // wait for the first chunk (headers) msg_t header; if (pipe->read(header, should_stop)) { SRV_DBG("%s", "received response headers\n"); this->status = header.status; this->headers = header.headers; } else { SRV_DBG("%s", "no response headers received (request cancelled?)\n"); } }