Merge 3ace5fa277 into ec2b787ebe
This commit is contained in:
commit
cf03d7d4c0
|
|
@ -1668,6 +1668,124 @@ bool is_socket_alive(socket_t sock) {
|
|||
return detail::read_socket(sock, &buf[0], sizeof(buf), MSG_PEEK) > 0;
|
||||
}
|
||||
|
||||
// Read the first HTTP request line (until \r\n) with timeout. Returns empty string on timeout/error.
|
||||
std::string read_first_line(socket_t sock, time_t timeout_sec, time_t timeout_usec) {
|
||||
std::string line;
|
||||
line.reserve(128);
|
||||
char prev = 0;
|
||||
const time_t sec = (timeout_sec > 0 || timeout_usec > 0) ? (timeout_sec ? timeout_sec : 1) : 1;
|
||||
const time_t usec = timeout_usec;
|
||||
for (;;) {
|
||||
if (line.size() >= CPPHTTPLIB_MAX_LINE_LENGTH) { return std::string(); }
|
||||
if (select_read(sock, sec, usec) <= 0) { return std::string(); }
|
||||
char byte;
|
||||
auto n = read_socket(sock, &byte, 1, CPPHTTPLIB_RECV_FLAGS);
|
||||
if (n <= 0) { return std::string(); }
|
||||
line += byte;
|
||||
#ifndef CPPHTTPLIB_ALLOW_LF_AS_LINE_TERMINATOR
|
||||
if (prev == '\r' && byte == '\n') { break; }
|
||||
prev = byte;
|
||||
#else
|
||||
if (byte == '\n') { break; }
|
||||
#endif
|
||||
}
|
||||
return line;
|
||||
}
|
||||
|
||||
int shutdown_socket(socket_t sock); // forward declaration
|
||||
|
||||
// If first_line is GET /health or GET /v1/health, consume the request, send 200 OK, close sock, return true.
|
||||
bool try_handle_health(socket_t sock, const std::string &first_line) {
|
||||
if (first_line.size() < 14) { return false; }
|
||||
if (first_line.compare(0, 4, "GET ") != 0) { return false; }
|
||||
auto space2 = first_line.find(' ', 4);
|
||||
if (space2 == std::string::npos) { return false; }
|
||||
std::string path(first_line.data() + 4, first_line.data() + space2);
|
||||
if (path.find('?') != std::string::npos) {
|
||||
path.resize(path.find('?'));
|
||||
}
|
||||
if (path != "/health" && path != "/v1/health") { return false; }
|
||||
|
||||
char buf[4096];
|
||||
size_t n = 0;
|
||||
for (;;) {
|
||||
if (n >= sizeof(buf) - 1) { return false; }
|
||||
ssize_t r = read_socket(sock, buf + n, sizeof(buf) - 1 - n, CPPHTTPLIB_RECV_FLAGS);
|
||||
if (r <= 0) { return false; }
|
||||
n += static_cast<size_t>(r);
|
||||
buf[n] = '\0';
|
||||
if (n >= 4 && std::strstr(buf, "\r\n\r\n") != nullptr) { break; }
|
||||
}
|
||||
|
||||
const char body[] = "{\"status\":\"ok\"}";
|
||||
const size_t body_len = sizeof(body) - 1;
|
||||
std::string msg = "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: ";
|
||||
msg += std::to_string(body_len);
|
||||
msg += "\r\nConnection: close\r\n\r\n";
|
||||
msg.append(body, body_len);
|
||||
|
||||
const char *p = msg.data();
|
||||
size_t left = msg.size();
|
||||
while (left > 0) {
|
||||
size_t chunk = (left < 8192) ? left : 8192;
|
||||
ssize_t w = send_socket(sock, p, chunk, CPPHTTPLIB_SEND_FLAGS);
|
||||
if (w <= 0) { break; }
|
||||
p += static_cast<size_t>(w);
|
||||
left -= static_cast<size_t>(w);
|
||||
}
|
||||
|
||||
shutdown_socket(sock);
|
||||
close_socket(sock);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Stream that prefixes reads with a pre-read buffer (e.g. first request line)
|
||||
class PrefixStream final : public Stream {
|
||||
public:
|
||||
PrefixStream(Stream &inner, std::string prefix)
|
||||
: inner_(inner), prefix_(std::move(prefix)), prefix_off_(0) {}
|
||||
|
||||
bool is_readable() const override {
|
||||
return prefix_off_ < prefix_.size() || inner_.is_readable();
|
||||
}
|
||||
bool wait_readable() const override {
|
||||
if (prefix_off_ < prefix_.size()) { return true; }
|
||||
return inner_.wait_readable();
|
||||
}
|
||||
bool wait_writable() const override { return inner_.wait_writable(); }
|
||||
bool is_peer_alive() const override { return inner_.is_peer_alive(); }
|
||||
|
||||
ssize_t read(char *ptr, size_t size) override {
|
||||
if (prefix_off_ < prefix_.size()) {
|
||||
size_t from_prefix =
|
||||
(std::min)(size, prefix_.size() - prefix_off_);
|
||||
memcpy(ptr, prefix_.data() + prefix_off_, from_prefix);
|
||||
prefix_off_ += from_prefix;
|
||||
return static_cast<ssize_t>(from_prefix);
|
||||
}
|
||||
return inner_.read(ptr, size);
|
||||
}
|
||||
ssize_t write(const char *ptr, size_t size) override {
|
||||
return inner_.write(ptr, size);
|
||||
}
|
||||
void get_remote_ip_and_port(std::string &ip, int &port) const override {
|
||||
inner_.get_remote_ip_and_port(ip, port);
|
||||
}
|
||||
void get_local_ip_and_port(std::string &ip, int &port) const override {
|
||||
inner_.get_local_ip_and_port(ip, port);
|
||||
}
|
||||
socket_t socket() const override { return inner_.socket(); }
|
||||
time_t duration() const override { return inner_.duration(); }
|
||||
void set_read_timeout(time_t sec, time_t usec) override {
|
||||
inner_.set_read_timeout(sec, usec);
|
||||
}
|
||||
|
||||
private:
|
||||
Stream &inner_;
|
||||
std::string prefix_;
|
||||
size_t prefix_off_ = 0;
|
||||
};
|
||||
|
||||
class SocketStream final : public Stream {
|
||||
public:
|
||||
SocketStream(socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec,
|
||||
|
|
@ -1772,6 +1890,29 @@ process_server_socket(const std::atomic<socket_t> &svr_sock, socket_t sock,
|
|||
});
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool
|
||||
process_server_socket_with_prefix(const std::atomic<socket_t> &svr_sock,
|
||||
socket_t sock, size_t keep_alive_max_count,
|
||||
time_t keep_alive_timeout_sec,
|
||||
time_t read_timeout_sec, time_t read_timeout_usec,
|
||||
time_t write_timeout_sec, time_t write_timeout_usec,
|
||||
std::string first_line, T callback) {
|
||||
bool first = true;
|
||||
return process_server_socket_core(
|
||||
svr_sock, sock, keep_alive_max_count, keep_alive_timeout_sec,
|
||||
[&](bool close_connection, bool &connection_closed) {
|
||||
SocketStream strm(sock, read_timeout_sec, read_timeout_usec,
|
||||
write_timeout_sec, write_timeout_usec);
|
||||
if (first && !first_line.empty()) {
|
||||
first = false;
|
||||
PrefixStream pstrm(strm, std::move(first_line));
|
||||
return callback(pstrm, close_connection, connection_closed);
|
||||
}
|
||||
return callback(strm, close_connection, connection_closed);
|
||||
});
|
||||
}
|
||||
|
||||
bool process_client_socket(
|
||||
socket_t sock, time_t read_timeout_sec, time_t read_timeout_usec,
|
||||
time_t write_timeout_sec, time_t write_timeout_usec,
|
||||
|
|
@ -7682,8 +7823,14 @@ bool Server::listen_internal() {
|
|||
detail::set_socket_opt(sock, IPPROTO_TCP, TCP_NODELAY, 1);
|
||||
}
|
||||
|
||||
if (!task_queue->enqueue(
|
||||
[this, sock]() { process_and_close_socket(sock); })) {
|
||||
std::string first_line = detail::read_first_line(sock, 1, 0);
|
||||
if (!first_line.empty() && detail::try_handle_health(sock, first_line)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!task_queue->enqueue([this, sock, first_line]() {
|
||||
process_and_close_socket(sock, first_line);
|
||||
})) {
|
||||
output_error_log(Error::ResourceExhaustion, nullptr);
|
||||
detail::shutdown_socket(sock);
|
||||
detail::close_socket(sock);
|
||||
|
|
@ -8243,7 +8390,8 @@ Server::process_request(Stream &strm, const std::string &remote_addr,
|
|||
|
||||
bool Server::is_valid() const { return true; }
|
||||
|
||||
bool Server::process_and_close_socket(socket_t sock) {
|
||||
bool Server::process_and_close_socket(socket_t sock,
|
||||
const std::string &first_line) {
|
||||
std::string remote_addr;
|
||||
int remote_port = 0;
|
||||
detail::get_remote_ip_and_port(sock, remote_addr, remote_port);
|
||||
|
|
@ -8253,15 +8401,25 @@ bool Server::process_and_close_socket(socket_t sock) {
|
|||
detail::get_local_ip_and_port(sock, local_addr, local_port);
|
||||
|
||||
bool websocket_upgraded = false;
|
||||
auto ret = detail::process_server_socket(
|
||||
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
|
||||
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
|
||||
write_timeout_usec_,
|
||||
[&](Stream &strm, bool close_connection, bool &connection_closed) {
|
||||
return process_request(strm, remote_addr, remote_port, local_addr,
|
||||
local_port, close_connection, connection_closed,
|
||||
nullptr, &websocket_upgraded);
|
||||
});
|
||||
auto req_cb = [&](Stream &strm, bool close_connection,
|
||||
bool &connection_closed) {
|
||||
return process_request(strm, remote_addr, remote_port, local_addr,
|
||||
local_port, close_connection, connection_closed,
|
||||
nullptr, &websocket_upgraded);
|
||||
};
|
||||
|
||||
bool ret;
|
||||
if (first_line.empty()) {
|
||||
ret = detail::process_server_socket(
|
||||
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
|
||||
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
|
||||
write_timeout_usec_, req_cb);
|
||||
} else {
|
||||
ret = detail::process_server_socket_with_prefix(
|
||||
svr_sock_, sock, keep_alive_max_count_, keep_alive_timeout_sec_,
|
||||
read_timeout_sec_, read_timeout_usec_, write_timeout_sec_,
|
||||
write_timeout_usec_, first_line, req_cb);
|
||||
}
|
||||
|
||||
detail::shutdown_socket(sock);
|
||||
detail::close_socket(sock);
|
||||
|
|
|
|||
|
|
@ -1685,6 +1685,7 @@ public:
|
|||
std::function<TaskQueue *(void)> new_task_queue;
|
||||
|
||||
protected:
|
||||
bool process_and_close_socket(socket_t sock, const std::string &first_line);
|
||||
bool process_request(Stream &strm, const std::string &remote_addr,
|
||||
int remote_port, const std::string &local_addr,
|
||||
int local_port, bool close_connection,
|
||||
|
|
@ -1763,7 +1764,9 @@ private:
|
|||
FormDataHeader multipart_header,
|
||||
ContentReceiver multipart_receiver) const;
|
||||
|
||||
virtual bool process_and_close_socket(socket_t sock);
|
||||
virtual bool process_and_close_socket(socket_t sock) {
|
||||
return process_and_close_socket(sock, std::string());
|
||||
}
|
||||
|
||||
void output_log(const Request &req, const Response &res) const;
|
||||
void output_pre_compression_log(const Request &req,
|
||||
|
|
|
|||
Loading…
Reference in New Issue