diff --git a/ggml/src/ggml-rpc/feature-readme.md b/ggml/src/ggml-rpc/feature-readme.md index cb92ffbd1f..278ded4d7b 100644 --- a/ggml/src/ggml-rpc/feature-readme.md +++ b/ggml/src/ggml-rpc/feature-readme.md @@ -37,7 +37,7 @@ Client Server |<--- HELLO rsp + RDMA rsp ---------| | (version + QPN, PSN, GID) | | | - | [rdma_activate: INIT→RTR→RTS] | [rdma_activate: INIT→RTR→RTS] + | [rdma_activate: INIT->RTR->RTS] | [rdma_activate: INIT->RTR->RTS] | [swap fn_send/fn_recv to RDMA] | [swap fn_send/fn_recv to RDMA] | | |==== All subsequent data via RDMA ==| @@ -70,7 +70,7 @@ Transport is selected once at connection time by swapping function pointers. All **3. Two-phase RDMA setup** - `rdma_probe()` -- Before HELLO: opens RDMA device, creates QP (stays in RESET state), allocates and registers buffers. Returns local QPN/GID for the HELLO exchange. -- `rdma_activate()` -- After HELLO: given remote QPN/GID, transitions QP through INIT→RTR→RTS and pre-posts the receive ring. +- `rdma_activate()` -- After HELLO: given remote QPN/GID, transitions QP through INIT->RTR->RTS and pre-posts the receive ring. This split is necessary because both sides need the other's QPN before they can complete the QP state machine. @@ -95,10 +95,10 @@ The data transport layer uses several optimizations developed through iterative | Scenario | Behavior | | -------- | -------- | -| New client → old server | Client sends extended HELLO (24 bytes input). Old server treats extra bytes as unknown input, responds with standard 3-byte HELLO. Client sees no RDMA fields, stays on TCP. | -| Old client → new server | Client sends standard HELLO (0 bytes input). Server detects `input_size == 0`, responds with standard HELLO. No RDMA negotiation attempted. | -| New client → new server, no RDMA hardware | `rdma_probe()` returns nullptr. Client sends standard HELLO. Normal TCP operation. | -| New client → new server, RDMA available | Full auto-negotiation. RDMA transport activated after HELLO. | +| New client -> old server | Client sends extended HELLO (24 bytes input). Old server treats extra bytes as unknown input, responds with standard 3-byte HELLO. Client sees no RDMA fields, stays on TCP. | +| Old client -> new server | Client sends standard HELLO (0 bytes input). Server detects `input_size == 0`, responds with standard HELLO. No RDMA negotiation attempted. | +| New client -> new server, no RDMA hardware | `rdma_probe()` returns nullptr. Client sends standard HELLO. Normal TCP operation. | +| New client -> new server, RDMA available | Full auto-negotiation. RDMA transport activated after HELLO. | ## Files changed @@ -120,6 +120,9 @@ cmake -B build \ cmake --build build --target rpc-server llama-bench -j$(nproc) ``` -Requires `libibverbs-dev` (Ubuntu: `apt install libibverbs-dev`). - -Without `-DGGML_RPC_RDMA=ON`, the build produces a standard TCP-only binary with no RDMA code compiled in. +Requires `libibverbs-dev` (Ubuntu: `apt install libibverbs-dev rdma-core`). +This is an optional dependency -- without `-DGGML_RPC_RDMA=ON`, the build +produces a standard TCP-only binary with no RDMA code or libibverbs linkage. +The libibverbs library is part of rdma-core and is available on all major +Linux distributions. RDMA transport is Linux-only; Windows and macOS builds +are unaffected. diff --git a/ggml/src/ggml-rpc/ggml-rpc.cpp b/ggml/src/ggml-rpc/ggml-rpc.cpp index 6feb37379d..087093e268 100644 --- a/ggml/src/ggml-rpc/ggml-rpc.cpp +++ b/ggml/src/ggml-rpc/ggml-rpc.cpp @@ -34,7 +34,7 @@ #ifdef GGML_RPC_RDMA # include # include -#endif +#endif // GGML_RPC_RDMA static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); @@ -49,7 +49,7 @@ static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB #ifdef GGML_RPC_RDMA static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock) static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB -#endif +#endif // GGML_RPC_RDMA #ifdef _WIN32 typedef SOCKET sockfd_t; @@ -103,7 +103,7 @@ struct rdma_conn { if (ctx) ibv_close_device(ctx); } }; -#endif +#endif // GGML_RPC_RDMA // Forward declarations for transport function pointers struct socket_t; @@ -116,12 +116,12 @@ struct socket_t { bool (*fn_recv)(socket_t *, void *, size_t) = tcp_recv_impl; #ifdef GGML_RPC_RDMA rdma_conn * rdma = nullptr; -#endif +#endif // GGML_RPC_RDMA socket_t(sockfd_t fd) : fd(fd) {} ~socket_t() { #ifdef GGML_RPC_RDMA if (rdma) { delete rdma; rdma = nullptr; } -#endif +#endif // GGML_RPC_RDMA LOG_DBG("[%s] closing socket %d\n", __func__, this->fd); #ifdef _WIN32 if (fd != INVALID_SOCKET) closesocket(this->fd); @@ -200,11 +200,15 @@ struct rpc_msg_hello_rdma_rsp { uint8_t major; uint8_t minor; uint8_t patch; + uint8_t padding; uint32_t rdma_qpn; // 0 = no RDMA capability uint32_t rdma_psn; uint8_t rdma_gid[16]; }; +static_assert(sizeof(rpc_msg_hello_rdma_req) == 24, "rpc_msg_hello_rdma_req must be 24 bytes"); +static_assert(sizeof(rpc_msg_hello_rdma_rsp) == 28, "rpc_msg_hello_rdma_rsp must be 28 bytes"); + struct rpc_msg_device_count_rsp { uint32_t device_count; }; @@ -498,7 +502,7 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) { return true; } -// ── TCP transport implementations (for function-pointer dispatch) ──────────── +// TCP transport implementations (for function-pointer dispatch) static bool tcp_send_impl(socket_t * sock, const void * data, size_t size) { return send_data(sock->fd, data, size); @@ -508,7 +512,7 @@ static bool tcp_recv_impl(socket_t * sock, void * data, size_t size) { return recv_data(sock->fd, data, size); } -// ── RDMA transport (performance-optimized, auto-negotiated) ───────────────── +// RDMA transport (performance-optimized, auto-negotiated) #ifdef GGML_RPC_RDMA @@ -711,11 +715,11 @@ static rdma_conn * rdma_probe(sockfd_t tcp_fd, rdma_local_info * out) { return c; } -// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET→INIT→pre-post→RTR→RTS. +// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET->INIT->pre-post->RTR->RTS. // On success, the connection is live and ready for rdma_send/rdma_recv. static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) { - // RESET → INIT + // RESET -> INIT { struct ibv_qp_attr a = {}; a.qp_state = IBV_QPS_INIT; @@ -732,7 +736,7 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, if (!c->post_rx(i)) return false; } - // INIT → RTR + // INIT -> RTR { struct ibv_qp_attr a = {}; a.qp_state = IBV_QPS_RTR; @@ -754,7 +758,7 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, } } - // RTR → RTS + // RTR -> RTS { struct ibv_qp_attr a = {}; a.qp_state = IBV_QPS_RTS; @@ -770,14 +774,14 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, } } - GGML_LOG_INFO("RDMA activated: qpn=%u→%u mtu=%d rx_depth=%d\n", + GGML_LOG_INFO("RDMA activated: qpn=%u->%u mtu=%d rx_depth=%d\n", local->qpn, remote_qpn, 128 << local->path_mtu, RDMA_RX_DEPTH); return true; } #endif // GGML_RPC_RDMA -// ── Unified transport dispatch (via function pointers) ────────────────────── +// unified transport dispatch (via function pointers) static bool send_data(socket_t * sock, const void * data, size_t size) { return sock->fn_send(sock, data, size); @@ -875,7 +879,7 @@ static bool send_rpc_cmd(const std::shared_ptr & sock, enum rpc_cmd cm static bool negotiate_hello(const std::shared_ptr & sock) { #ifdef GGML_RPC_RDMA rdma_local_info local_info = {}; - rdma_conn * probe = rdma_probe(sock->fd, &local_info); + std::unique_ptr probe(rdma_probe(sock->fd, &local_info)); if (probe) { rpc_msg_hello_rdma_req req = {}; @@ -883,54 +887,49 @@ static bool negotiate_hello(const std::shared_ptr & sock) { req.rdma_psn = local_info.psn; memcpy(req.rdma_gid, local_info.gid, 16); - // Send extended HELLO: cmd + input_size + input_data if (!send_rpc_cmd(sock, RPC_CMD_HELLO, &req, sizeof(req))) { - delete probe; return false; } - // Read response size -- server may respond with legacy or extended size uint64_t out_size = 0; if (!recv_data(sock.get(), &out_size, sizeof(out_size))) { - delete probe; return false; } if (out_size == sizeof(rpc_msg_hello_rdma_rsp)) { rpc_msg_hello_rdma_rsp rsp = {}; - if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { delete probe; return false; } + if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { + return false; + } if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) { GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch); - delete probe; return false; } if (rsp.rdma_qpn != 0) { - if (rdma_activate(probe, &local_info, rsp.rdma_qpn, rsp.rdma_psn, rsp.rdma_gid)) { - sock->rdma = probe; + if (rdma_activate(probe.get(), &local_info, rsp.rdma_qpn, rsp.rdma_psn, rsp.rdma_gid)) { + sock->rdma = probe.release(); sock->fn_send = rdma_send_impl; sock->fn_recv = rdma_recv_impl; return true; } GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n"); } - delete probe; return true; } else if (out_size == sizeof(rpc_msg_hello_rsp)) { - // Legacy server responded with standard HELLO (ignored our RDMA req) rpc_msg_hello_rsp rsp = {}; - if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { delete probe; return false; } - delete probe; + if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { + return false; + } if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) { GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch); return false; } return true; } else { - delete probe; return false; } } -#endif +#endif // GGML_RPC_RDMA rpc_msg_hello_rsp response; bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response)); RPC_STATUS_ASSERT(status); @@ -2033,70 +2032,45 @@ static void rpc_serve_client(const std::vector & backends, const } if (hello_input_size == sizeof(rpc_msg_hello_rdma_req)) { - // Extended HELLO with RDMA fields rpc_msg_hello_rdma_req req = {}; if (!recv_data(sockfd, &req, sizeof(req))) { return; } + rpc_msg_hello_rdma_rsp rsp = {}; + { rpc_msg_hello_rsp v; server.hello(v); rsp.major = v.major; rsp.minor = v.minor; rsp.patch = v.patch; } + #ifdef GGML_RPC_RDMA rdma_local_info local_info = {}; - rdma_conn * probe = rdma_probe(sockfd->fd, &local_info); - - rpc_msg_hello_rdma_rsp rsp = {}; - rpc_msg_hello_rsp base_rsp; - server.hello(base_rsp); - rsp.major = base_rsp.major; - rsp.minor = base_rsp.minor; - rsp.patch = base_rsp.patch; - + std::unique_ptr probe(rdma_probe(sockfd->fd, &local_info)); if (probe && req.rdma_qpn != 0) { rsp.rdma_qpn = local_info.qpn; rsp.rdma_psn = local_info.psn; memcpy(rsp.rdma_gid, local_info.gid, 16); } +#endif // GGML_RPC_RDMA if (!send_msg(sockfd, &rsp, sizeof(rsp))) { - delete probe; return; } - if (probe && req.rdma_qpn != 0 && rsp.rdma_qpn != 0) { - if (rdma_activate(probe, &local_info, req.rdma_qpn, req.rdma_psn, req.rdma_gid)) { - sockfd->rdma = probe; +#ifdef GGML_RPC_RDMA + if (probe && rsp.rdma_qpn != 0) { + if (rdma_activate(probe.get(), &local_info, req.rdma_qpn, req.rdma_psn, req.rdma_gid)) { + sockfd->rdma = probe.release(); sockfd->fn_send = rdma_send_impl; sockfd->fn_recv = rdma_recv_impl; } else { GGML_LOG_ERROR("RDMA activate failed on server, staying on TCP\n"); - delete probe; } - } else { - delete probe; - } -#else - // Not compiled with RDMA -- respond with zeros for RDMA fields - rpc_msg_hello_rdma_rsp rsp = {}; - rpc_msg_hello_rsp base_rsp; - server.hello(base_rsp); - rsp.major = base_rsp.major; - rsp.minor = base_rsp.minor; - rsp.patch = base_rsp.patch; - if (!send_msg(sockfd, &rsp, sizeof(rsp))) { - return; - } -#endif - } else if (hello_input_size == 0) { - // Legacy HELLO (no RDMA) - rpc_msg_hello_rsp response; - server.hello(response); - if (!send_msg(sockfd, &response, sizeof(response))) { - return; } +#endif // GGML_RPC_RDMA } else { - // Unknown HELLO size -- consume and respond with legacy - std::vector discard(hello_input_size); - if (!recv_data(sockfd, discard.data(), hello_input_size)) { - return; + if (hello_input_size > 0) { + std::vector discard(hello_input_size); + if (!recv_data(sockfd, discard.data(), hello_input_size)) { + return; + } } rpc_msg_hello_rsp response; server.hello(response); @@ -2377,7 +2351,7 @@ void ggml_backend_rpc_start_server(const char * endpoint, const char * cache_dir printf(" transport : TCP (RDMA auto-negotiate enabled)\n"); #else printf(" transport : TCP\n"); -#endif +#endif // GGML_RPC_RDMA #ifdef _WIN32 { WSADATA wsaData;