#include "server-common.h" #include "server-models.h" #include "download.h" #include // TODO: remove this once we use HTTP client from download.h #include #include #include #include #include #include #include #include #include #ifdef _WIN32 #include #else #include #include #include #include #endif #if defined(__APPLE__) && defined(__MACH__) // macOS: use _NSGetExecutablePath to get the executable path #include #include #endif #define CMD_EXIT "exit" static std::filesystem::path get_server_exec_path() { #if defined(_WIN32) wchar_t buf[32768] = { 0 }; // Large buffer to handle long paths DWORD len = GetModuleFileNameW(nullptr, buf, _countof(buf)); if (len == 0 || len >= _countof(buf)) { throw std::runtime_error("GetModuleFileNameW failed or path too long"); } return std::filesystem::path(buf); #elif defined(__APPLE__) && defined(__MACH__) char small_path[PATH_MAX]; uint32_t size = sizeof(small_path); if (_NSGetExecutablePath(small_path, &size) == 0) { // resolve any symlinks to get absolute path try { return std::filesystem::canonical(std::filesystem::path(small_path)); } catch (...) { return std::filesystem::path(small_path); } } else { // buffer was too small, allocate required size and call again std::vector buf(size); if (_NSGetExecutablePath(buf.data(), &size) == 0) { try { return std::filesystem::canonical(std::filesystem::path(buf.data())); } catch (...) { return std::filesystem::path(buf.data()); } } throw std::runtime_error("_NSGetExecutablePath failed after buffer resize"); } #else char path[FILENAME_MAX]; ssize_t count = readlink("/proc/self/exe", path, FILENAME_MAX); if (count <= 0) { throw std::runtime_error("failed to resolve /proc/self/exe"); } return std::filesystem::path(std::string(path, count)); #endif } struct local_model { std::string name; std::string path; std::string path_mmproj; }; static std::vector list_local_models(const std::string & dir) { if (!std::filesystem::exists(dir) || !std::filesystem::is_directory(dir)) { throw std::runtime_error(string_format("error: '%s' does not exist or is not a directory\n", dir.c_str())); } std::vector models; auto scan_subdir = [&models](const std::string & subdir_path, const std::string & name) { auto files = fs_list(subdir_path, false); common_file_info model_file; common_file_info first_shard_file; common_file_info mmproj_file; for (const auto & file : files) { if (string_ends_with(file.name, ".gguf")) { if (file.name.find("mmproj") != std::string::npos) { mmproj_file = file; } else if (file.name.find("-00001-of-") != std::string::npos) { first_shard_file = file; } else { model_file = file; } } } // single file model local_model model{ /* name */ name, /* path */ first_shard_file.path.empty() ? model_file.path : first_shard_file.path, /* path_mmproj */ mmproj_file.path // can be empty }; if (!model.path.empty()) { models.push_back(model); } }; auto files = fs_list(dir, true); for (const auto & file : files) { if (file.is_dir) { scan_subdir(file.path, file.name); } else if (string_ends_with(file.name, ".gguf")) { // single file model std::string name = file.name; string_replace_all(name, ".gguf", ""); local_model model{ /* name */ name, /* path */ file.path, /* path_mmproj */ "" }; models.push_back(model); } } return models; } // // server_models // server_models::server_models( const common_params & params, int argc, char ** argv, char ** envp) : base_params(params) { for (int i = 0; i < argc; i++) { base_args.push_back(std::string(argv[i])); } for (char ** env = envp; *env != nullptr; env++) { base_env.push_back(std::string(*env)); } GGML_ASSERT(!base_args.empty()); // set binary path try { base_args[0] = get_server_exec_path().string(); } catch (const std::exception & e) { LOG_WRN("failed to get server executable path: %s\n", e.what()); LOG_WRN("using original argv[0] as fallback: %s\n", base_args[0].c_str()); } // TODO: allow refreshing cached model list // add cached models auto cached_models = common_list_cached_models(); for (const auto & model : cached_models) { server_model_meta meta{ /* name */ model.to_string(), /* path */ model.manifest_path, /* path_mmproj */ "", // auto-detected when loading /* in_cache */ true, /* port */ 0, /* status */ SERVER_MODEL_STATUS_UNLOADED, /* last_used */ 0, /* args */ std::vector(), /* exit_code */ 0 }; mapping[meta.name] = instance_t{ /* subproc */ std::make_shared(), /* th */ std::thread(), /* meta */ meta }; } // add local models specificed via --models-dir if (!params.models_dir.empty()) { auto local_models = list_local_models(params.models_dir); for (const auto & model : local_models) { if (mapping.find(model.name) != mapping.end()) { // already exists in cached models, skip continue; } server_model_meta meta{ /* name */ model.name, /* path */ model.path, /* path_mmproj */ model.path_mmproj, /* in_cache */ false, /* port */ 0, /* status */ SERVER_MODEL_STATUS_UNLOADED, /* last_used */ 0, /* args */ std::vector(), /* exit_code */ 0 }; mapping[meta.name] = instance_t{ /* subproc */ std::make_shared(), /* th */ std::thread(), /* meta */ meta }; } } } void server_models::update_meta(const std::string & name, const server_model_meta & meta) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { it->second.meta = meta; } cv.notify_all(); // notify wait_until_loaded } bool server_models::has_model(const std::string & name) { std::lock_guard lk(mutex); return mapping.find(name) != mapping.end(); } std::optional server_models::get_meta(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { return it->second.meta; } return std::nullopt; } static int get_free_port() { #ifdef _WIN32 WSADATA wsaData; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { return -1; } typedef SOCKET native_socket_t; #define INVALID_SOCKET_VAL INVALID_SOCKET #define CLOSE_SOCKET(s) closesocket(s) #else typedef int native_socket_t; #define INVALID_SOCKET_VAL -1 #define CLOSE_SOCKET(s) close(s) #endif native_socket_t sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == INVALID_SOCKET_VAL) { #ifdef _WIN32 WSACleanup(); #endif return -1; } struct sockaddr_in serv_addr; std::memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port = htons(0); if (bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) != 0) { CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return -1; } #ifdef _WIN32 int namelen = sizeof(serv_addr); #else socklen_t namelen = sizeof(serv_addr); #endif if (getsockname(sock, (struct sockaddr*)&serv_addr, &namelen) != 0) { CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return -1; } int port = ntohs(serv_addr.sin_port); CLOSE_SOCKET(sock); #ifdef _WIN32 WSACleanup(); #endif return port; } // helper to convert vector to char ** // pointers are only valid as long as the original vector is valid static std::vector to_char_ptr_array(const std::vector & vec) { std::vector result; result.reserve(vec.size() + 1); for (const auto & s : vec) { result.push_back(const_cast(s.c_str())); } result.push_back(nullptr); return result; } std::vector server_models::get_all_meta() { std::lock_guard lk(mutex); std::vector result; result.reserve(mapping.size()); for (const auto & [name, inst] : mapping) { result.push_back(inst.meta); } return result; } void server_models::unload_lru() { if (base_params.models_max <= 0) { return; // no limit } // remove one of the servers if we passed the models_max (least recently used - LRU) std::string lru_model_name = ""; int64_t lru_last_used = ggml_time_ms(); size_t count_active = 0; { std::lock_guard lk(mutex); for (const auto & m : mapping) { if (m.second.meta.is_active()) { count_active++; if (m.second.meta.last_used < lru_last_used) { lru_model_name = m.first; lru_last_used = m.second.meta.last_used; } } } } if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) { SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str()); unload(lru_model_name); } } static void add_or_replace_arg(std::vector & args, const std::string & key, const std::string & value) { for (size_t i = 0; i < args.size(); i++) { if (args[i] == key && i + 1 < args.size()) { args[i + 1] = value; return; } } // not found, append args.push_back(key); args.push_back(value); } void server_models::load(const std::string & name, bool auto_load) { if (!has_model(name)) { throw std::runtime_error("model name=" + name + " is not found"); } unload_lru(); std::lock_guard lk(mutex); auto meta = mapping[name].meta; if (meta.status != SERVER_MODEL_STATUS_UNLOADED) { SRV_INF("model %s is not ready\n", name.c_str()); return; } // prepare new instance info instance_t inst; inst.meta = meta; inst.meta.port = get_free_port(); inst.meta.status = SERVER_MODEL_STATUS_LOADING; inst.meta.last_used = ggml_time_ms(); if (inst.meta.port <= 0) { throw std::runtime_error("failed to get a port number"); } inst.subproc = std::make_shared(); { SRV_INF("spawning server instance with name=%s on port %d\n", inst.meta.name.c_str(), inst.meta.port); std::vector child_args; if (auto_load && !meta.args.empty()) { child_args = meta.args; // copy previous args } else { child_args = base_args; // copy if (inst.meta.in_cache) { add_or_replace_arg(child_args, "-hf", inst.meta.name); } else { add_or_replace_arg(child_args, "-m", inst.meta.path); if (!inst.meta.path_mmproj.empty()) { add_or_replace_arg(child_args, "--mmproj", inst.meta.path_mmproj); } } } // set model args add_or_replace_arg(child_args, "--port", std::to_string(inst.meta.port)); add_or_replace_arg(child_args, "--alias", inst.meta.name); std::vector child_env = base_env; // copy child_env.push_back("LLAMA_SERVER_ROUTER_PORT=" + std::to_string(base_params.port)); SRV_INF("%s", "spawning server instance with args:\n"); for (const auto & arg : child_args) { SRV_INF(" %s\n", arg.c_str()); } inst.meta.args = child_args; // save for debugging std::vector argv = to_char_ptr_array(child_args); std::vector envp = to_char_ptr_array(child_env); int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr; int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get()); if (result != 0) { throw std::runtime_error("failed to spawn server instance"); } inst.stdin_file = subprocess_stdin(inst.subproc.get()); } // start a thread to manage the child process // captured variables are guaranteed to be destroyed only after the thread is joined inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { // read stdout/stderr and forward to main server log FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); if (p_stdout_stderr) { char buffer[4096]; while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { LOG("[%5d] %s", port, buffer); } } else { SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); } // we reach here when the child process exits int exit_code = 0; subprocess_join(child_proc.get(), &exit_code); subprocess_destroy(child_proc.get()); // update PID and status { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { auto & meta = it->second.meta; meta.exit_code = exit_code; meta.status = SERVER_MODEL_STATUS_UNLOADED; } cv.notify_all(); } SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code); }); // clean up old process/thread if exists { auto & old_instance = mapping[name]; // old process should have exited already, but just in case, we clean it up here if (subprocess_alive(old_instance.subproc.get())) { SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str()); subprocess_terminate(old_instance.subproc.get()); // force kill } if (old_instance.th.joinable()) { old_instance.th.join(); } } mapping[name] = std::move(inst); cv.notify_all(); } static void interrupt_subprocess(FILE * stdin_file) { // because subprocess.h does not provide a way to send SIGINT, // we will send a command to the child process to exit gracefully if (stdin_file) { fprintf(stdin_file, "%s\n", CMD_EXIT); fflush(stdin_file); } } void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { if (it->second.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); interrupt_subprocess(it->second.stdin_file); // status change will be handled by the managing thread } else { SRV_WRN("model instance name=%s is not loaded\n", name.c_str()); } } } void server_models::unload_all() { std::vector to_join; { std::lock_guard lk(mutex); for (auto & [name, inst] : mapping) { if (inst.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); interrupt_subprocess(inst.stdin_file); // status change will be handled by the managing thread } // moving the thread to join list to avoid deadlock to_join.push_back(std::move(inst.th)); } } for (auto & th : to_join) { if (th.joinable()) { th.join(); } } } void server_models::update_status(const std::string & name, server_model_status status) { // for now, we only allow updating to LOADED status if (status != SERVER_MODEL_STATUS_LOADED) { throw std::runtime_error("invalid status value"); } auto meta = get_meta(name); if (meta.has_value()) { meta->status = status; update_meta(name, meta.value()); } } void server_models::wait_until_loaded(const std::string & name) { std::unique_lock lk(mutex); cv.wait(lk, [this, &name]() { auto it = mapping.find(name); if (it != mapping.end()) { return it->second.meta.status != SERVER_MODEL_STATUS_LOADING; } return false; }); } bool server_models::ensure_model_loaded(const std::string & name) { auto meta = get_meta(name); if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } if (meta->status == SERVER_MODEL_STATUS_LOADED) { return false; // already loaded } if (meta->status == SERVER_MODEL_STATUS_UNLOADED) { SRV_INF("model name=%s is not loaded, loading...\n", name.c_str()); load(name, true); } SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str()); wait_until_loaded(name); // check final status meta = get_meta(name); if (!meta.has_value() || meta->is_failed()) { throw std::runtime_error("model name=" + name + " failed to load"); } return true; } server_http_res_ptr server_models::proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used) { auto meta = get_meta(name); if (!meta.has_value()) { throw std::runtime_error("model name=" + name + " is not found"); } if (meta->status != SERVER_MODEL_STATUS_LOADED) { throw std::invalid_argument("model name=" + name + " is not loaded"); } if (update_last_used) { std::unique_lock lk(mutex); mapping[name].meta.last_used = ggml_time_ms(); } SRV_INF("proxying request to model %s on port %d\n", name.c_str(), meta->port); auto proxy = std::make_unique( method, base_params.hostname, meta->port, req.path, req.headers, req.body, req.should_stop); return proxy; } std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function & shutdown_handler) { // send a notification to the router server that a model instance is ready // TODO @ngxson : use HTTP client from libcommon httplib::Client cli(base_params.hostname, router_port); cli.set_connection_timeout(0, 200000); // 200 milliseconds httplib::Request req; req.method = "POST"; req.path = "/models/status"; req.set_header("Content-Type", "application/json"); if (!base_params.api_keys.empty()) { req.set_header("Authorization", "Bearer " + base_params.api_keys[0]); } json body; body["model"] = name; body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED); req.body = body.dump(); SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str()); auto result = cli.send(std::move(req)); if (result.error() != httplib::Error::Success) { auto err_str = httplib::to_string(result.error()); SRV_ERR("failed to notify router server: %s\n", err_str.c_str()); exit(1); // force exit } // setup thread for monitoring stdin return std::thread([shutdown_handler]() { // wait for EOF on stdin SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n"); bool eof = false; while (true) { std::string line; if (!std::getline(std::cin, line)) { // EOF detected, that means the router server is unexpectedly exit or killed eof = true; break; } if (line.find(CMD_EXIT) != std::string::npos) { SRV_INF("%s", "exit command received, exiting...\n"); shutdown_handler(0); break; } } if (eof) { SRV_INF("%s", "EOF on stdin detected, forcing shutdown...\n"); exit(1); } }); } // // server_models_routes // static void res_ok(std::unique_ptr & res, const json & response_data) { res->status = 200; res->data = safe_json_to_str(response_data); } static void res_error(std::unique_ptr & res, const json & error_data) { res->status = json_value(error_data, "code", 500); res->data = safe_json_to_str({{ "error", error_data }}); } static bool router_validate_model(const std::string & name, server_models & models, bool models_autoload, std::unique_ptr & res) { if (name.empty()) { res_error(res, format_error_response("model name is missing from the request", ERROR_TYPE_INVALID_REQUEST)); return false; } auto meta = models.get_meta(name); if (!meta.has_value()) { res_error(res, format_error_response("model not found", ERROR_TYPE_INVALID_REQUEST)); return false; } if (models_autoload) { models.ensure_model_loaded(name); } else { if (meta->status != SERVER_MODEL_STATUS_LOADED) { res_error(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST)); return false; } } return true; } static bool is_autoload(const common_params & params, const server_http_req & req) { std::string autoload = req.get_param("autoload"); if (autoload.empty()) { return params.models_autoload; } else { return autoload == "true" || autoload == "1"; } } void server_models_routes::init_routes() { this->get_router_props = [this](const server_http_req & req) { std::string name = req.get_param("model"); if (name.empty()) { // main instance auto res = std::make_unique(); res_ok(res, { // TODO: add support for this on web UI {"role", "router"}, {"max_instances", 4}, // dummy value for testing // this is a dummy response to make sure webui doesn't break {"model_alias", "llama-server"}, {"model_path", "none"}, {"default_generation_settings", { {"params", json{}}, {"n_ctx", 0}, }}, }); return res; } return proxy_get(req); }; this->proxy_get = [this](const server_http_req & req) { std::string method = "GET"; std::string name = req.get_param("model"); bool autoload = is_autoload(params, req); auto error_res = std::make_unique(); if (!router_validate_model(name, models, autoload, error_res)) { return error_res; } return models.proxy_request(req, method, name, false); }; this->proxy_post = [this](const server_http_req & req) { std::string method = "POST"; json body = json::parse(req.body); std::string name = json_value(body, "model", std::string()); bool autoload = is_autoload(params, req); auto error_res = std::make_unique(); if (!router_validate_model(name, models, autoload, error_res)) { return error_res; } return models.proxy_request(req, method, name, true); // update last usage for POST request only }; this->get_router_models = [this](const server_http_req &) { auto res = std::make_unique(); json models_json = json::array(); auto all_models = models.get_all_meta(); std::time_t t = std::time(0); for (const auto & meta : all_models) { json status { {"value", server_model_status_to_string(meta.status)}, {"args", meta.args}, }; if (meta.is_failed()) { status["exit_code"] = meta.exit_code; status["failed"] = true; } models_json.push_back(json { {"id", meta.name}, {"object", "model"}, // for OAI-compat {"owned_by", "llamacpp"}, // for OAI-compat {"created", t}, // for OAI-compat {"in_cache", meta.in_cache}, {"path", meta.path}, {"status", status}, // TODO: add other fields, may require reading GGUF metadata }); } res_ok(res, { {"data", models_json}, {"object", "list"}, }); return res; }; this->post_router_models_load = [this](const server_http_req & req) { auto res = std::make_unique(); json body = json::parse(req.body); std::string name = json_value(body, "model", std::string()); auto model = models.get_meta(name); if (!model.has_value()) { res_error(res, format_error_response("model is not found", ERROR_TYPE_NOT_FOUND)); return res; } if (model->status == SERVER_MODEL_STATUS_LOADED) { res_error(res, format_error_response("model is already loaded", ERROR_TYPE_INVALID_REQUEST)); return res; } models.load(name, false); res_ok(res, {{"success", true}}); return res; }; // used by child process to notify the router about status change // TODO @ngxson : maybe implement authentication for this endpoint in the future this->post_router_models_status = [this](const server_http_req & req) { auto res = std::make_unique(); json body = json::parse(req.body); std::string model = json_value(body, "model", std::string()); std::string value = json_value(body, "value", std::string()); models.update_status(model, server_model_status_from_string(value)); res_ok(res, {{"success", true}}); return res; }; this->get_router_models = [this](const server_http_req &) { auto res = std::make_unique(); json models_json = json::array(); auto all_models = models.get_all_meta(); std::time_t t = std::time(0); for (const auto & meta : all_models) { json status { {"value", server_model_status_to_string(meta.status)}, {"args", meta.args}, }; if (meta.is_failed()) { status["exit_code"] = meta.exit_code; status["failed"] = true; } models_json.push_back(json { {"id", meta.name}, {"object", "model"}, // for OAI-compat {"owned_by", "llamacpp"}, // for OAI-compat {"created", t}, // for OAI-compat {"in_cache", meta.in_cache}, {"path", meta.path}, {"status", status}, // TODO: add other fields, may require reading GGUF metadata }); } res_ok(res, { {"data", models_json}, {"object", "list"}, }); return res; }; this->post_router_models_unload = [this](const server_http_req & req) { auto res = std::make_unique(); json body = json::parse(req.body); std::string name = json_value(body, "model", std::string()); auto model = models.get_meta(name); if (!model.has_value()) { res_error(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST)); return res; } if (model->status != SERVER_MODEL_STATUS_LOADED) { res_error(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST)); return res; } models.unload(name); res_ok(res, {{"success", true}}); return res; }; } // // server_http_proxy // // simple implementation of a pipe // used for streaming data between threads template struct pipe_t { std::mutex mutex; std::condition_variable cv; std::queue queue; std::atomic writer_closed{false}; std::atomic reader_closed{false}; void close_write() { writer_closed.store(true, std::memory_order_relaxed); cv.notify_all(); } void close_read() { reader_closed.store(true, std::memory_order_relaxed); cv.notify_all(); } bool read(T & output, const std::function & should_stop) { std::unique_lock lk(mutex); constexpr auto poll_interval = std::chrono::milliseconds(500); while (true) { if (!queue.empty()) { output = std::move(queue.front()); queue.pop(); return true; } if (writer_closed.load()) { return false; // clean EOF } if (should_stop()) { close_read(); // signal broken pipe to writer return false; // cancelled / reader no longer alive } cv.wait_for(lk, poll_interval); } } bool write(T && data) { std::lock_guard lk(mutex); if (reader_closed.load()) { return false; // broken pipe } queue.push(std::move(data)); cv.notify_one(); return true; } }; server_http_proxy::server_http_proxy( const std::string & method, const std::string & host, int port, const std::string & path, const std::map & headers, const std::string & body, const std::function should_stop) { // shared between reader and writer threads auto cli = std::make_shared(host, port); auto pipe = std::make_shared>(); // setup Client cli->set_connection_timeout(0, 200000); // 200 milliseconds this->status = 500; // to be overwritten upon response this->cleanup = [pipe]() { pipe->close_read(); pipe->close_write(); }; // wire up the receive end of the pipe this->next = [pipe, should_stop](std::string & out) -> bool { msg_t msg; bool has_next = pipe->read(msg, should_stop); if (!msg.data.empty()) { out = std::move(msg.data); } return has_next; // false if EOF or pipe broken }; // wire up the HTTP client // note: do NOT capture `this` pointer, as it may be destroyed before the thread ends httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) { msg_t msg; msg.status = response.status; for (const auto & [key, value] : response.headers) { msg.headers[key] = value; } return pipe->write(std::move(msg)); // send headers first }; httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) { // send data chunks // returns false if pipe is closed / broken (signal to stop receiving) return pipe->write({{}, 0, std::string(data, data_length)}); }; // prepare the request to destination server httplib::Request req; { req.method = method; req.path = path; for (const auto & [key, value] : headers) { req.set_header(key, value); } req.body = body; req.response_handler = response_handler; req.content_receiver = content_receiver; } // start the proxy thread SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str()); this->thread = std::thread([cli, pipe, req]() { auto result = cli->send(std::move(req)); if (result.error() != httplib::Error::Success) { auto err_str = httplib::to_string(result.error()); SRV_ERR("http client error: %s\n", err_str.c_str()); pipe->write({{}, 500, ""}); // header pipe->write({{}, 0, "proxy error: " + err_str}); // body } pipe->close_write(); // signal EOF to reader SRV_DBG("%s", "client request thread ended\n"); }); this->thread.detach(); // wait for the first chunk (headers) msg_t header; if (pipe->read(header, should_stop)) { SRV_DBG("%s", "received response headers\n"); this->status = header.status; this->headers = header.headers; } else { SRV_DBG("%s", "no response headers received (request cancelled?)\n"); } }