ggml-rpc: contribution guidelines fixes

This commit is contained in:
Valeriy Dubov 2026-03-15 15:02:17 +03:00
parent 83463249a0
commit 737da44540
2 changed files with 55 additions and 78 deletions

View File

@ -37,7 +37,7 @@ Client Server
|<--- HELLO rsp + RDMA rsp ---------| |<--- HELLO rsp + RDMA rsp ---------|
| (version + QPN, PSN, GID) | | (version + QPN, PSN, GID) |
| | | |
| [rdma_activate: INIT→RTR→RTS] | [rdma_activate: INIT→RTR→RTS] | [rdma_activate: INIT->RTR->RTS] | [rdma_activate: INIT->RTR->RTS]
| [swap fn_send/fn_recv to RDMA] | [swap fn_send/fn_recv to RDMA] | [swap fn_send/fn_recv to RDMA] | [swap fn_send/fn_recv to RDMA]
| | | |
|==== All subsequent data via RDMA ==| |==== All subsequent data via RDMA ==|
@ -70,7 +70,7 @@ Transport is selected once at connection time by swapping function pointers. All
**3. Two-phase RDMA setup** **3. Two-phase RDMA setup**
- `rdma_probe()` -- Before HELLO: opens RDMA device, creates QP (stays in RESET state), allocates and registers buffers. Returns local QPN/GID for the HELLO exchange. - `rdma_probe()` -- Before HELLO: opens RDMA device, creates QP (stays in RESET state), allocates and registers buffers. Returns local QPN/GID for the HELLO exchange.
- `rdma_activate()` -- After HELLO: given remote QPN/GID, transitions QP through INIT→RTR→RTS and pre-posts the receive ring. - `rdma_activate()` -- After HELLO: given remote QPN/GID, transitions QP through INIT->RTR->RTS and pre-posts the receive ring.
This split is necessary because both sides need the other's QPN before they can complete the QP state machine. This split is necessary because both sides need the other's QPN before they can complete the QP state machine.
@ -95,10 +95,10 @@ The data transport layer uses several optimizations developed through iterative
| Scenario | Behavior | | Scenario | Behavior |
| -------- | -------- | | -------- | -------- |
| New client old server | Client sends extended HELLO (24 bytes input). Old server treats extra bytes as unknown input, responds with standard 3-byte HELLO. Client sees no RDMA fields, stays on TCP. | | New client -> old server | Client sends extended HELLO (24 bytes input). Old server treats extra bytes as unknown input, responds with standard 3-byte HELLO. Client sees no RDMA fields, stays on TCP. |
| Old client new server | Client sends standard HELLO (0 bytes input). Server detects `input_size == 0`, responds with standard HELLO. No RDMA negotiation attempted. | | Old client -> new server | Client sends standard HELLO (0 bytes input). Server detects `input_size == 0`, responds with standard HELLO. No RDMA negotiation attempted. |
| New client new server, no RDMA hardware | `rdma_probe()` returns nullptr. Client sends standard HELLO. Normal TCP operation. | | New client -> new server, no RDMA hardware | `rdma_probe()` returns nullptr. Client sends standard HELLO. Normal TCP operation. |
| New client new server, RDMA available | Full auto-negotiation. RDMA transport activated after HELLO. | | New client -> new server, RDMA available | Full auto-negotiation. RDMA transport activated after HELLO. |
## Files changed ## Files changed
@ -120,6 +120,9 @@ cmake -B build \
cmake --build build --target rpc-server llama-bench -j$(nproc) cmake --build build --target rpc-server llama-bench -j$(nproc)
``` ```
Requires `libibverbs-dev` (Ubuntu: `apt install libibverbs-dev`). Requires `libibverbs-dev` (Ubuntu: `apt install libibverbs-dev rdma-core`).
This is an optional dependency -- without `-DGGML_RPC_RDMA=ON`, the build
Without `-DGGML_RPC_RDMA=ON`, the build produces a standard TCP-only binary with no RDMA code compiled in. produces a standard TCP-only binary with no RDMA code or libibverbs linkage.
The libibverbs library is part of rdma-core and is available on all major
Linux distributions. RDMA transport is Linux-only; Windows and macOS builds
are unaffected.

View File

@ -34,7 +34,7 @@
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
# include <infiniband/verbs.h> # include <infiniband/verbs.h>
# include <time.h> # include <time.h>
#endif #endif // GGML_RPC_RDMA
static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG");
@ -49,7 +49,7 @@ static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock) static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock)
static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB
#endif #endif // GGML_RPC_RDMA
#ifdef _WIN32 #ifdef _WIN32
typedef SOCKET sockfd_t; typedef SOCKET sockfd_t;
@ -103,7 +103,7 @@ struct rdma_conn {
if (ctx) ibv_close_device(ctx); if (ctx) ibv_close_device(ctx);
} }
}; };
#endif #endif // GGML_RPC_RDMA
// Forward declarations for transport function pointers // Forward declarations for transport function pointers
struct socket_t; struct socket_t;
@ -116,12 +116,12 @@ struct socket_t {
bool (*fn_recv)(socket_t *, void *, size_t) = tcp_recv_impl; bool (*fn_recv)(socket_t *, void *, size_t) = tcp_recv_impl;
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
rdma_conn * rdma = nullptr; rdma_conn * rdma = nullptr;
#endif #endif // GGML_RPC_RDMA
socket_t(sockfd_t fd) : fd(fd) {} socket_t(sockfd_t fd) : fd(fd) {}
~socket_t() { ~socket_t() {
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
if (rdma) { delete rdma; rdma = nullptr; } if (rdma) { delete rdma; rdma = nullptr; }
#endif #endif // GGML_RPC_RDMA
LOG_DBG("[%s] closing socket %d\n", __func__, this->fd); LOG_DBG("[%s] closing socket %d\n", __func__, this->fd);
#ifdef _WIN32 #ifdef _WIN32
if (fd != INVALID_SOCKET) closesocket(this->fd); if (fd != INVALID_SOCKET) closesocket(this->fd);
@ -200,11 +200,15 @@ struct rpc_msg_hello_rdma_rsp {
uint8_t major; uint8_t major;
uint8_t minor; uint8_t minor;
uint8_t patch; uint8_t patch;
uint8_t padding;
uint32_t rdma_qpn; // 0 = no RDMA capability uint32_t rdma_qpn; // 0 = no RDMA capability
uint32_t rdma_psn; uint32_t rdma_psn;
uint8_t rdma_gid[16]; uint8_t rdma_gid[16];
}; };
static_assert(sizeof(rpc_msg_hello_rdma_req) == 24, "rpc_msg_hello_rdma_req must be 24 bytes");
static_assert(sizeof(rpc_msg_hello_rdma_rsp) == 28, "rpc_msg_hello_rdma_rsp must be 28 bytes");
struct rpc_msg_device_count_rsp { struct rpc_msg_device_count_rsp {
uint32_t device_count; uint32_t device_count;
}; };
@ -498,7 +502,7 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
return true; return true;
} }
// ── TCP transport implementations (for function-pointer dispatch) ──────────── // TCP transport implementations (for function-pointer dispatch)
static bool tcp_send_impl(socket_t * sock, const void * data, size_t size) { static bool tcp_send_impl(socket_t * sock, const void * data, size_t size) {
return send_data(sock->fd, data, size); return send_data(sock->fd, data, size);
@ -508,7 +512,7 @@ static bool tcp_recv_impl(socket_t * sock, void * data, size_t size) {
return recv_data(sock->fd, data, size); return recv_data(sock->fd, data, size);
} }
// ── RDMA transport (performance-optimized, auto-negotiated) ───────────────── // RDMA transport (performance-optimized, auto-negotiated)
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
@ -711,11 +715,11 @@ static rdma_conn * rdma_probe(sockfd_t tcp_fd, rdma_local_info * out) {
return c; return c;
} }
// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET→INIT→pre-post→RTR→RTS. // Phase 2: Given remote QPN/PSN/GID, transition QP: RESET->INIT->pre-post->RTR->RTS.
// On success, the connection is live and ready for rdma_send/rdma_recv. // On success, the connection is live and ready for rdma_send/rdma_recv.
static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, static bool rdma_activate(rdma_conn * c, const rdma_local_info * local,
uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) { uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) {
// RESET INIT // RESET -> INIT
{ {
struct ibv_qp_attr a = {}; struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_INIT; a.qp_state = IBV_QPS_INIT;
@ -732,7 +736,7 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local,
if (!c->post_rx(i)) return false; if (!c->post_rx(i)) return false;
} }
// INIT RTR // INIT -> RTR
{ {
struct ibv_qp_attr a = {}; struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_RTR; a.qp_state = IBV_QPS_RTR;
@ -754,7 +758,7 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local,
} }
} }
// RTR RTS // RTR -> RTS
{ {
struct ibv_qp_attr a = {}; struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_RTS; a.qp_state = IBV_QPS_RTS;
@ -770,14 +774,14 @@ static bool rdma_activate(rdma_conn * c, const rdma_local_info * local,
} }
} }
GGML_LOG_INFO("RDMA activated: qpn=%u%u mtu=%d rx_depth=%d\n", GGML_LOG_INFO("RDMA activated: qpn=%u->%u mtu=%d rx_depth=%d\n",
local->qpn, remote_qpn, 128 << local->path_mtu, RDMA_RX_DEPTH); local->qpn, remote_qpn, 128 << local->path_mtu, RDMA_RX_DEPTH);
return true; return true;
} }
#endif // GGML_RPC_RDMA #endif // GGML_RPC_RDMA
// ── Unified transport dispatch (via function pointers) ────────────────────── // unified transport dispatch (via function pointers)
static bool send_data(socket_t * sock, const void * data, size_t size) { static bool send_data(socket_t * sock, const void * data, size_t size) {
return sock->fn_send(sock, data, size); return sock->fn_send(sock, data, size);
@ -875,7 +879,7 @@ static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cm
static bool negotiate_hello(const std::shared_ptr<socket_t> & sock) { static bool negotiate_hello(const std::shared_ptr<socket_t> & sock) {
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
rdma_local_info local_info = {}; rdma_local_info local_info = {};
rdma_conn * probe = rdma_probe(sock->fd, &local_info); std::unique_ptr<rdma_conn> probe(rdma_probe(sock->fd, &local_info));
if (probe) { if (probe) {
rpc_msg_hello_rdma_req req = {}; rpc_msg_hello_rdma_req req = {};
@ -883,54 +887,49 @@ static bool negotiate_hello(const std::shared_ptr<socket_t> & sock) {
req.rdma_psn = local_info.psn; req.rdma_psn = local_info.psn;
memcpy(req.rdma_gid, local_info.gid, 16); memcpy(req.rdma_gid, local_info.gid, 16);
// Send extended HELLO: cmd + input_size + input_data
if (!send_rpc_cmd(sock, RPC_CMD_HELLO, &req, sizeof(req))) { if (!send_rpc_cmd(sock, RPC_CMD_HELLO, &req, sizeof(req))) {
delete probe;
return false; return false;
} }
// Read response size -- server may respond with legacy or extended size
uint64_t out_size = 0; uint64_t out_size = 0;
if (!recv_data(sock.get(), &out_size, sizeof(out_size))) { if (!recv_data(sock.get(), &out_size, sizeof(out_size))) {
delete probe;
return false; return false;
} }
if (out_size == sizeof(rpc_msg_hello_rdma_rsp)) { if (out_size == sizeof(rpc_msg_hello_rdma_rsp)) {
rpc_msg_hello_rdma_rsp rsp = {}; rpc_msg_hello_rdma_rsp rsp = {};
if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { delete probe; return false; } if (!recv_data(sock.get(), &rsp, sizeof(rsp))) {
return false;
}
if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) { if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) {
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch); GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch);
delete probe;
return false; return false;
} }
if (rsp.rdma_qpn != 0) { if (rsp.rdma_qpn != 0) {
if (rdma_activate(probe, &local_info, rsp.rdma_qpn, rsp.rdma_psn, rsp.rdma_gid)) { if (rdma_activate(probe.get(), &local_info, rsp.rdma_qpn, rsp.rdma_psn, rsp.rdma_gid)) {
sock->rdma = probe; sock->rdma = probe.release();
sock->fn_send = rdma_send_impl; sock->fn_send = rdma_send_impl;
sock->fn_recv = rdma_recv_impl; sock->fn_recv = rdma_recv_impl;
return true; return true;
} }
GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n"); GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n");
} }
delete probe;
return true; return true;
} else if (out_size == sizeof(rpc_msg_hello_rsp)) { } else if (out_size == sizeof(rpc_msg_hello_rsp)) {
// Legacy server responded with standard HELLO (ignored our RDMA req)
rpc_msg_hello_rsp rsp = {}; rpc_msg_hello_rsp rsp = {};
if (!recv_data(sock.get(), &rsp, sizeof(rsp))) { delete probe; return false; } if (!recv_data(sock.get(), &rsp, sizeof(rsp))) {
delete probe; return false;
}
if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) { if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) {
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch); GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch);
return false; return false;
} }
return true; return true;
} else { } else {
delete probe;
return false; return false;
} }
} }
#endif #endif // GGML_RPC_RDMA
rpc_msg_hello_rsp response; rpc_msg_hello_rsp response;
bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response)); bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
RPC_STATUS_ASSERT(status); RPC_STATUS_ASSERT(status);
@ -2033,70 +2032,45 @@ static void rpc_serve_client(const std::vector<ggml_backend_t> & backends, const
} }
if (hello_input_size == sizeof(rpc_msg_hello_rdma_req)) { if (hello_input_size == sizeof(rpc_msg_hello_rdma_req)) {
// Extended HELLO with RDMA fields
rpc_msg_hello_rdma_req req = {}; rpc_msg_hello_rdma_req req = {};
if (!recv_data(sockfd, &req, sizeof(req))) { if (!recv_data(sockfd, &req, sizeof(req))) {
return; return;
} }
rpc_msg_hello_rdma_rsp rsp = {};
{ rpc_msg_hello_rsp v; server.hello(v); rsp.major = v.major; rsp.minor = v.minor; rsp.patch = v.patch; }
#ifdef GGML_RPC_RDMA #ifdef GGML_RPC_RDMA
rdma_local_info local_info = {}; rdma_local_info local_info = {};
rdma_conn * probe = rdma_probe(sockfd->fd, &local_info); std::unique_ptr<rdma_conn> probe(rdma_probe(sockfd->fd, &local_info));
rpc_msg_hello_rdma_rsp rsp = {};
rpc_msg_hello_rsp base_rsp;
server.hello(base_rsp);
rsp.major = base_rsp.major;
rsp.minor = base_rsp.minor;
rsp.patch = base_rsp.patch;
if (probe && req.rdma_qpn != 0) { if (probe && req.rdma_qpn != 0) {
rsp.rdma_qpn = local_info.qpn; rsp.rdma_qpn = local_info.qpn;
rsp.rdma_psn = local_info.psn; rsp.rdma_psn = local_info.psn;
memcpy(rsp.rdma_gid, local_info.gid, 16); memcpy(rsp.rdma_gid, local_info.gid, 16);
} }
#endif // GGML_RPC_RDMA
if (!send_msg(sockfd, &rsp, sizeof(rsp))) { if (!send_msg(sockfd, &rsp, sizeof(rsp))) {
delete probe;
return; return;
} }
if (probe && req.rdma_qpn != 0 && rsp.rdma_qpn != 0) { #ifdef GGML_RPC_RDMA
if (rdma_activate(probe, &local_info, req.rdma_qpn, req.rdma_psn, req.rdma_gid)) { if (probe && rsp.rdma_qpn != 0) {
sockfd->rdma = probe; if (rdma_activate(probe.get(), &local_info, req.rdma_qpn, req.rdma_psn, req.rdma_gid)) {
sockfd->rdma = probe.release();
sockfd->fn_send = rdma_send_impl; sockfd->fn_send = rdma_send_impl;
sockfd->fn_recv = rdma_recv_impl; sockfd->fn_recv = rdma_recv_impl;
} else { } else {
GGML_LOG_ERROR("RDMA activate failed on server, staying on TCP\n"); GGML_LOG_ERROR("RDMA activate failed on server, staying on TCP\n");
delete probe;
} }
} else {
delete probe;
}
#else
// Not compiled with RDMA -- respond with zeros for RDMA fields
rpc_msg_hello_rdma_rsp rsp = {};
rpc_msg_hello_rsp base_rsp;
server.hello(base_rsp);
rsp.major = base_rsp.major;
rsp.minor = base_rsp.minor;
rsp.patch = base_rsp.patch;
if (!send_msg(sockfd, &rsp, sizeof(rsp))) {
return;
}
#endif
} else if (hello_input_size == 0) {
// Legacy HELLO (no RDMA)
rpc_msg_hello_rsp response;
server.hello(response);
if (!send_msg(sockfd, &response, sizeof(response))) {
return;
} }
#endif // GGML_RPC_RDMA
} else { } else {
// Unknown HELLO size -- consume and respond with legacy if (hello_input_size > 0) {
std::vector<uint8_t> discard(hello_input_size); std::vector<uint8_t> discard(hello_input_size);
if (!recv_data(sockfd, discard.data(), hello_input_size)) { if (!recv_data(sockfd, discard.data(), hello_input_size)) {
return; return;
}
} }
rpc_msg_hello_rsp response; rpc_msg_hello_rsp response;
server.hello(response); server.hello(response);
@ -2377,7 +2351,7 @@ void ggml_backend_rpc_start_server(const char * endpoint, const char * cache_dir
printf(" transport : TCP (RDMA auto-negotiate enabled)\n"); printf(" transport : TCP (RDMA auto-negotiate enabled)\n");
#else #else
printf(" transport : TCP\n"); printf(" transport : TCP\n");
#endif #endif // GGML_RPC_RDMA
#ifdef _WIN32 #ifdef _WIN32
{ {
WSADATA wsaData; WSADATA wsaData;