From 3ace5fa277c840f61508143bf8f35d8e17b57934 Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Thu, 19 Mar 2026 17:03:13 +0200 Subject: [PATCH] server : add special handling for /health in httplib When the number of parallel requests to llama-server exceed the number of http threads, llama-server stop responding to /health which is very disruptive in k8s deployments, causing restarts of properly working inference endpoints. Unfortunately, there is no way to fix this outside of httplib and this patch adds a rather ugly hack for handling GET /health requests before dispatching them to the thread pool. No changes are made in the HTTPS implementation. closes: #20684 --- vendor/cpp-httplib/httplib.cpp | 182 ++++++++++++++++++++++++++++++--- vendor/cpp-httplib/httplib.h | 5 +- 2 files changed, 174 insertions(+), 13 deletions(-) diff --git a/vendor/cpp-httplib/httplib.cpp b/vendor/cpp-httplib/httplib.cpp index fa0718218e..4e6442c14a 100644 --- a/vendor/cpp-httplib/httplib.cpp +++ b/vendor/cpp-httplib/httplib.cpp @@ -1668,6 +1668,124 @@ bool is_socket_alive(socket_t sock) { return detail::read_socket(sock, &buf[0], sizeof(buf), MSG_PEEK) > 0; } +// Read the first HTTP request line (until \r\n) with timeout. Returns empty string on timeout/error. +std::string read_first_line(socket_t sock, time_t timeout_sec, time_t timeout_usec) { + std::string line; + line.reserve(128); + char prev = 0; + const time_t sec = (timeout_sec > 0 || timeout_usec > 0) ? (timeout_sec ? timeout_sec : 1) : 1; + const time_t usec = timeout_usec; + for (;;) { + if (line.size() >= CPPHTTPLIB_MAX_LINE_LENGTH) { return std::string(); } + if (select_read(sock, sec, usec) <= 0) { return std::string(); } + char byte; + auto n = read_socket(sock, &byte, 1, CPPHTTPLIB_RECV_FLAGS); + if (n <= 0) { return std::string(); } + line += byte; +#ifndef CPPHTTPLIB_ALLOW_LF_AS_LINE_TERMINATOR + if (prev == '\r' && byte == '\n') { break; } + prev = byte; +#else + if (byte == '\n') { break; } +#endif + } + return line; +} + +int shutdown_socket(socket_t sock); // forward declaration + +// If first_line is GET /health or GET /v1/health, consume the request, send 200 OK, close sock, return true. +bool try_handle_health(socket_t sock, const std::string &first_line) { + if (first_line.size() < 14) { return false; } + if (first_line.compare(0, 4, "GET ") != 0) { return false; } + auto space2 = first_line.find(' ', 4); + if (space2 == std::string::npos) { return false; } + std::string path(first_line.data() + 4, first_line.data() + space2); + if (path.find('?') != std::string::npos) { + path.resize(path.find('?')); + } + if (path != "/health" && path != "/v1/health") { return false; } + + char buf[4096]; + size_t n = 0; + for (;;) { + if (n >= sizeof(buf) - 1) { return false; } + ssize_t r = read_socket(sock, buf + n, sizeof(buf) - 1 - n, CPPHTTPLIB_RECV_FLAGS); + if (r <= 0) { return false; } + n += static_cast(r); + buf[n] = '\0'; + if (n >= 4 && std::strstr(buf, "\r\n\r\n") != nullptr) { break; } + } + + const char body[] = "{\"status\":\"ok\"}"; + const size_t body_len = sizeof(body) - 1; + std::string msg = "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: "; + msg += std::to_string(body_len); + msg += "\r\nConnection: close\r\n\r\n"; + msg.append(body, body_len); + + const char *p = msg.data(); + size_t left = msg.size(); + while (left > 0) { + size_t chunk = (left < 8192) ? left : 8192; + ssize_t w = send_socket(sock, p, chunk, CPPHTTPLIB_SEND_FLAGS); + if (w <= 0) { break; } + p += static_cast(w); + left -= static_cast(w); + } + + shutdown_socket(sock); + close_socket(sock); + return true; +} + +// Stream that prefixes reads with a pre-read buffer (e.g. first request line) +class PrefixStream final : public Stream { +public: + PrefixStream(Stream &inner, std::string prefix) + : inner_(inner), prefix_(std::move(prefix)), prefix_off_(0) {} + + bool is_readable() const override { + return prefix_off_ < prefix_.size() || inner_.is_readable(); + } + bool wait_readable() const override { + if (prefix_off_ < prefix_.size()) { return true; } + return inner_.wait_readable(); + } + bool wait_writable() const override { return inner_.wait_writable(); } + bool is_peer_alive() const override { return inner_.is_peer_alive(); } + + ssize_t read(char *ptr, size_t size) override { + if (prefix_off_ < prefix_.size()) { + size_t from_prefix = + (std::min)(size, prefix_.size() - prefix_off_); + memcpy(ptr, prefix_.data() + prefix_off_, from_prefix); + prefix_off_ += from_prefix; + return static_cast(from_prefix); + } + return inner_.read(ptr, size); + } + ssize_t write(const char *ptr, size_t size) override { + return inner_.write(ptr, size); + } + void get_remote_ip_and_port(std::string &ip, int &port) const override { + inner_.get_remote_ip_and_port(ip, port); + } + void get_local_ip_and_port(std::string &ip, int &port) const override { + inner_.get_local_ip_and_port(ip, port); + } + socket_t socket() const override { return inner_.socket(); } + time_t duration() const override { return inner_.duration(); } + void set_read_timeout(time_t sec, time_t usec) override { + inner_.set_read_timeout(sec, usec); + } + +private: + Stream &inner_; + std::string prefix_; + size_t prefix_off_ = 0; +}; + class SocketStream final : public Stream { public: SocketStream(socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec, @@ -1772,6 +1890,29 @@ process_server_socket(const std::atomic &svr_sock, socket_t sock, }); } +template +bool +process_server_socket_with_prefix(const std::atomic &svr_sock, + socket_t sock, size_t keep_alive_max_count, + time_t keep_alive_timeout_sec, + time_t read_timeout_sec, time_t read_timeout_usec, + time_t write_timeout_sec, time_t write_timeout_usec, + std::string first_line, T callback) { + bool first = true; + return process_server_socket_core( + svr_sock, sock, keep_alive_max_count, keep_alive_timeout_sec, + [&](bool close_connection, bool &connection_closed) { + SocketStream strm(sock, read_timeout_sec, read_timeout_usec, + write_timeout_sec, write_timeout_usec); + if (first && !first_line.empty()) { + first = false; + PrefixStream pstrm(strm, std::move(first_line)); + return callback(pstrm, close_connection, connection_closed); + } + return callback(strm, close_connection, connection_closed); + }); +} + bool process_client_socket( socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec, time_t write_timeout_sec, time_t write_timeout_usec, @@ -7682,8 +7823,14 @@ bool Server::listen_internal() { detail::set_socket_opt(sock, IPPROTO_TCP, TCP_NODELAY, 1); } - if (!task_queue->enqueue( - [this, sock]() { process_and_close_socket(sock); })) { + std::string first_line = detail::read_first_line(sock, 1, 0); + if (!first_line.empty() && detail::try_handle_health(sock, first_line)) { + continue; + } + + if (!task_queue->enqueue([this, sock, first_line]() { + process_and_close_socket(sock, first_line); + })) { output_error_log(Error::ResourceExhaustion, nullptr); detail::shutdown_socket(sock); detail::close_socket(sock); @@ -8243,7 +8390,8 @@ Server::process_request(Stream &strm, const std::string &remote_addr, bool Server::is_valid() const { return true; } -bool Server::process_and_close_socket(socket_t sock) { +bool Server::process_and_close_socket(socket_t sock, + const std::string &first_line) { std::string remote_addr; int remote_port = 0; detail::get_remote_ip_and_port(sock, remote_addr, remote_port); @@ -8253,15 +8401,25 @@ bool Server::process_and_close_socket(socket_t sock) { detail::get_local_ip_and_port(sock, local_addr, local_port); bool websocket_upgraded = false; - auto ret = detail::process_server_socket( - svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_, - read_timeout_sec_, read_timeout_usec_, write_timeout_sec_, - write_timeout_usec_, - [&](Stream &strm, bool close_connection, bool &connection_closed) { - return process_request(strm, remote_addr, remote_port, local_addr, - local_port, close_connection, connection_closed, - nullptr, &websocket_upgraded); - }); + auto req_cb = [&](Stream &strm, bool close_connection, + bool &connection_closed) { + return process_request(strm, remote_addr, remote_port, local_addr, + local_port, close_connection, connection_closed, + nullptr, &websocket_upgraded); + }; + + bool ret; + if (first_line.empty()) { + ret = detail::process_server_socket( + svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_, + read_timeout_sec_, read_timeout_usec_, write_timeout_sec_, + write_timeout_usec_, req_cb); + } else { + ret = detail::process_server_socket_with_prefix( + svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_, + read_timeout_sec_, read_timeout_usec_, write_timeout_sec_, + write_timeout_usec_, first_line, req_cb); + } detail::shutdown_socket(sock); detail::close_socket(sock); diff --git a/vendor/cpp-httplib/httplib.h b/vendor/cpp-httplib/httplib.h index 6ec949ac51..537afa9b0c 100644 --- a/vendor/cpp-httplib/httplib.h +++ b/vendor/cpp-httplib/httplib.h @@ -1685,6 +1685,7 @@ public: std::function new_task_queue; protected: + bool process_and_close_socket(socket_t sock, const std::string &first_line); bool process_request(Stream &strm, const std::string &remote_addr, int remote_port, const std::string &local_addr, int local_port, bool close_connection, @@ -1763,7 +1764,9 @@ private: FormDataHeader multipart_header, ContentReceiver multipart_receiver) const; - virtual bool process_and_close_socket(socket_t sock); + virtual bool process_and_close_socket(socket_t sock) { + return process_and_close_socket(sock, std::string()); + } void output_log(const Request &req, const Response &res) const; void output_pre_compression_log(const Request &req,