From 759c28de3ae9814d0191794323a51b0bf8e6798b Mon Sep 17 00:00:00 2001 From: Valeriy Dubov Date: Mon, 16 Mar 2026 00:56:08 +0300 Subject: [PATCH] faster handle closed socket from previous operation --- ggml/src/ggml-rpc/ggml-rpc.cpp | 40 +++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/ggml/src/ggml-rpc/ggml-rpc.cpp b/ggml/src/ggml-rpc/ggml-rpc.cpp index 087093e268..068e9af990 100644 --- a/ggml/src/ggml-rpc/ggml-rpc.cpp +++ b/ggml/src/ggml-rpc/ggml-rpc.cpp @@ -34,6 +34,9 @@ #ifdef GGML_RPC_RDMA # include # include +# ifndef _WIN32 +# include +# endif #endif // GGML_RPC_RDMA static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); @@ -519,14 +522,34 @@ static bool tcp_recv_impl(socket_t * sock, void * data, size_t size) { static bool rdma_send_impl(socket_t * sock, const void * data, size_t size); static bool rdma_recv_impl(socket_t * sock, void * data, size_t size); -static inline bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc) { +static inline bool tcp_peer_closed(int fd) { + if (fd < 0) return false; +#ifndef _WIN32 + struct pollfd pfd = { fd, POLLIN | POLLRDHUP, 0 }; + int r = poll(&pfd, 1, 0); + return r > 0 && (pfd.revents & (POLLHUP | POLLERR | POLLRDHUP)); +#else + return false; +#endif +} + +static inline bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc, int tcp_fd = -1) { struct timespec t0; clock_gettime(CLOCK_MONOTONIC_COARSE, &t0); for (uint64_t s = 0; ; s++) { int n = ibv_poll_cq(cq, 1, wc); - if (n > 0) return wc->status == IBV_WC_SUCCESS; + if (n > 0) { + if (wc->status != IBV_WC_SUCCESS) { + GGML_LOG_ERROR("RDMA CQ wc error: status=%d (%s) vendor_err=0x%x\n", + wc->status, ibv_wc_status_str(wc->status), wc->vendor_err); + } + return wc->status == IBV_WC_SUCCESS; + } if (n < 0) return false; if ((s & 0xFFFFF) == 0 && s > 0) { + if (tcp_fd >= 0 && tcp_peer_closed(tcp_fd)) { + return false; + } struct timespec now; clock_gettime(CLOCK_MONOTONIC_COARSE, &now); if (now.tv_sec - t0.tv_sec >= 30) { @@ -537,7 +560,7 @@ static inline bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc) { } } -static bool rdma_send(rdma_conn * c, const void * data, size_t size) { +static bool rdma_send(rdma_conn * c, const void * data, size_t size, int tcp_fd = -1) { const uint8_t * src = (const uint8_t *)data; size_t rem = size; while (rem > 0) { @@ -563,7 +586,7 @@ static bool rdma_send(rdma_conn * c, const void * data, size_t size) { if (ibv_post_send(c->qp, &wr, &bad) != 0) return false; struct ibv_wc wc; - if (!rdma_poll(c->scq, &wc)) return false; + if (!rdma_poll(c->scq, &wc, tcp_fd)) return false; src += chunk; rem -= chunk; @@ -571,12 +594,13 @@ static bool rdma_send(rdma_conn * c, const void * data, size_t size) { return true; } -static bool rdma_recv(rdma_conn * c, void * data, size_t size) { + +static bool rdma_recv(rdma_conn * c, void * data, size_t size, int tcp_fd = -1) { uint8_t * dst = (uint8_t *)data; size_t rem = size; while (rem > 0) { struct ibv_wc wc; - if (!rdma_poll(c->rcq, &wc)) return false; + if (!rdma_poll(c->rcq, &wc, tcp_fd)) return false; int slot = (int)wc.wr_id; size_t got = wc.byte_len; @@ -591,11 +615,11 @@ static bool rdma_recv(rdma_conn * c, void * data, size_t size) { } static bool rdma_send_impl(socket_t * sock, const void * data, size_t size) { - return rdma_send(sock->rdma, data, size); + return rdma_send(sock->rdma, data, size, sock->fd); } static bool rdma_recv_impl(socket_t * sock, void * data, size_t size) { - return rdma_recv(sock->rdma, data, size); + return rdma_recv(sock->rdma, data, size, sock->fd); } // Phase 1: Probe for RDMA device, create QP (in RESET state), return local info.