server : add special handling for /health in httplib

When the number of parallel requests to llama-server exceed the number
of http threads, llama-server stop responding to /health which is very
disruptive in k8s deployments, causing restarts of properly working
inference endpoints.

Unfortunately, there is no way to fix this outside of httplib and this
patch adds a rather ugly hack for handling GET /health requests before
dispatching them to the thread pool.

No changes are made in the HTTPS implementation.

closes: #20684
This commit is contained in:
Radoslav Gerganov 2026-03-19 17:03:13 +02:00
parent fb78ad29bb
commit 3ace5fa277
2 changed files with 174 additions and 13 deletions

View File

@ -1668,6 +1668,124 @@ bool is_socket_alive(socket_t sock) {
return detail::read_socket(sock, &buf[0], sizeof(buf), MSG_PEEK) > 0;
}
// Read the first HTTP request line (until \r\n) with timeout. Returns empty string on timeout/error.
std::string read_first_line(socket_t sock, time_t timeout_sec, time_t timeout_usec) {
std::string line;
line.reserve(128);
char prev = 0;
const time_t sec = (timeout_sec > 0 || timeout_usec > 0) ? (timeout_sec ? timeout_sec : 1) : 1;
const time_t usec = timeout_usec;
for (;;) {
if (line.size() >= CPPHTTPLIB_MAX_LINE_LENGTH) { return std::string(); }
if (select_read(sock, sec, usec) <= 0) { return std::string(); }
char byte;
auto n = read_socket(sock, &byte, 1, CPPHTTPLIB_RECV_FLAGS);
if (n <= 0) { return std::string(); }
line += byte;
#ifndef CPPHTTPLIB_ALLOW_LF_AS_LINE_TERMINATOR
if (prev == '\r' && byte == '\n') { break; }
prev = byte;
#else
if (byte == '\n') { break; }
#endif
}
return line;
}
int shutdown_socket(socket_t sock); // forward declaration
// If first_line is GET /health or GET /v1/health, consume the request, send 200 OK, close sock, return true.
bool try_handle_health(socket_t sock, const std::string &first_line) {
if (first_line.size() < 14) { return false; }
if (first_line.compare(0, 4, "GET ") != 0) { return false; }
auto space2 = first_line.find(' ', 4);
if (space2 == std::string::npos) { return false; }
std::string path(first_line.data() + 4, first_line.data() + space2);
if (path.find('?') != std::string::npos) {
path.resize(path.find('?'));
}
if (path != "/health" && path != "/v1/health") { return false; }
char buf[4096];
size_t n = 0;
for (;;) {
if (n >= sizeof(buf) - 1) { return false; }
ssize_t r = read_socket(sock, buf + n, sizeof(buf) - 1 - n, CPPHTTPLIB_RECV_FLAGS);
if (r <= 0) { return false; }
n += static_cast<size_t>(r);
buf[n] = '\0';
if (n >= 4 && std::strstr(buf, "\r\n\r\n") != nullptr) { break; }
}
const char body[] = "{\"status\":\"ok\"}";
const size_t body_len = sizeof(body) - 1;
std::string msg = "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: ";
msg += std::to_string(body_len);
msg += "\r\nConnection: close\r\n\r\n";
msg.append(body, body_len);
const char *p = msg.data();
size_t left = msg.size();
while (left > 0) {
size_t chunk = (left < 8192) ? left : 8192;
ssize_t w = send_socket(sock, p, chunk, CPPHTTPLIB_SEND_FLAGS);
if (w <= 0) { break; }
p += static_cast<size_t>(w);
left -= static_cast<size_t>(w);
}
shutdown_socket(sock);
close_socket(sock);
return true;
}
// Stream that prefixes reads with a pre-read buffer (e.g. first request line)
class PrefixStream final : public Stream {
public:
PrefixStream(Stream &inner, std::string prefix)
: inner_(inner), prefix_(std::move(prefix)), prefix_off_(0) {}
bool is_readable() const override {
return prefix_off_ < prefix_.size() || inner_.is_readable();
}
bool wait_readable() const override {
if (prefix_off_ < prefix_.size()) { return true; }
return inner_.wait_readable();
}
bool wait_writable() const override { return inner_.wait_writable(); }
bool is_peer_alive() const override { return inner_.is_peer_alive(); }
ssize_t read(char *ptr, size_t size) override {
if (prefix_off_ < prefix_.size()) {
size_t from_prefix =
(std::min)(size, prefix_.size() - prefix_off_);
memcpy(ptr, prefix_.data() + prefix_off_, from_prefix);
prefix_off_ += from_prefix;
return static_cast<ssize_t>(from_prefix);
}
return inner_.read(ptr, size);
}
ssize_t write(const char *ptr, size_t size) override {
return inner_.write(ptr, size);
}
void get_remote_ip_and_port(std::string &ip, int &port) const override {
inner_.get_remote_ip_and_port(ip, port);
}
void get_local_ip_and_port(std::string &ip, int &port) const override {
inner_.get_local_ip_and_port(ip, port);
}
socket_t socket() const override { return inner_.socket(); }
time_t duration() const override { return inner_.duration(); }
void set_read_timeout(time_t sec, time_t usec) override {
inner_.set_read_timeout(sec, usec);
}
private:
Stream &inner_;
std::string prefix_;
size_t prefix_off_ = 0;
};
class SocketStream final : public Stream {
public:
SocketStream(socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec,
@ -1772,6 +1890,29 @@ process_server_socket(const std::atomic<socket_t> &svr_sock, socket_t sock,
});
}
template <typename T>
bool
process_server_socket_with_prefix(const std::atomic<socket_t> &svr_sock,
socket_t sock, size_t keep_alive_max_count,
time_t keep_alive_timeout_sec,
time_t read_timeout_sec, time_t read_timeout_usec,
time_t write_timeout_sec, time_t write_timeout_usec,
std::string first_line, T callback) {
bool first = true;
return process_server_socket_core(
svr_sock, sock, keep_alive_max_count, keep_alive_timeout_sec,
[&](bool close_connection, bool &connection_closed) {
SocketStream strm(sock, read_timeout_sec, read_timeout_usec,
write_timeout_sec, write_timeout_usec);
if (first && !first_line.empty()) {
first = false;
PrefixStream pstrm(strm, std::move(first_line));
return callback(pstrm, close_connection, connection_closed);
}
return callback(strm, close_connection, connection_closed);
});
}
bool process_client_socket(
socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec,
time_t write_timeout_sec, time_t write_timeout_usec,
@ -7682,8 +7823,14 @@ bool Server::listen_internal() {
detail::set_socket_opt(sock, IPPROTO_TCP, TCP_NODELAY, 1);
}
if (!task_queue->enqueue(
[this, sock]() { process_and_close_socket(sock); })) {
std::string first_line = detail::read_first_line(sock, 1, 0);
if (!first_line.empty() && detail::try_handle_health(sock, first_line)) {
continue;
}
if (!task_queue->enqueue([this, sock, first_line]() {
process_and_close_socket(sock, first_line);
})) {
output_error_log(Error::ResourceExhaustion, nullptr);
detail::shutdown_socket(sock);
detail::close_socket(sock);
@ -8243,7 +8390,8 @@ Server::process_request(Stream &strm, const std::string &remote_addr,
bool Server::is_valid() const { return true; }
bool Server::process_and_close_socket(socket_t sock) {
bool Server::process_and_close_socket(socket_t sock,
const std::string &first_line) {
std::string remote_addr;
int remote_port = 0;
detail::get_remote_ip_and_port(sock, remote_addr, remote_port);
@ -8253,15 +8401,25 @@ bool Server::process_and_close_socket(socket_t sock) {
detail::get_local_ip_and_port(sock, local_addr, local_port);
bool websocket_upgraded = false;
auto ret = detail::process_server_socket(
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
write_timeout_usec_,
[&](Stream &strm, bool close_connection, bool &connection_closed) {
return process_request(strm, remote_addr, remote_port, local_addr,
local_port, close_connection, connection_closed,
nullptr, &websocket_upgraded);
});
auto req_cb = [&](Stream &strm, bool close_connection,
bool &connection_closed) {
return process_request(strm, remote_addr, remote_port, local_addr,
local_port, close_connection, connection_closed,
nullptr, &websocket_upgraded);
};
bool ret;
if (first_line.empty()) {
ret = detail::process_server_socket(
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
write_timeout_usec_, req_cb);
} else {
ret = detail::process_server_socket_with_prefix(
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
write_timeout_usec_, first_line, req_cb);
}
detail::shutdown_socket(sock);
detail::close_socket(sock);

View File

@ -1685,6 +1685,7 @@ public:
std::function<TaskQueue *(void)> new_task_queue;
protected:
bool process_and_close_socket(socket_t sock, const std::string &first_line);
bool process_request(Stream &strm, const std::string &remote_addr,
int remote_port, const std::string &local_addr,
int local_port, bool close_connection,
@ -1763,7 +1764,9 @@ private:
FormDataHeader multipart_header,
ContentReceiver multipart_receiver) const;
virtual bool process_and_close_socket(socket_t sock);
virtual bool process_and_close_socket(socket_t sock) {
return process_and_close_socket(sock, std::string());
}
void output_log(const Request &req, const Response &res) const;
void output_pre_compression_log(const Request &req,