This commit is contained in:
Valeriy Dubov 2026-03-16 00:56:17 +03:00 committed by GitHub
commit 20e173d253
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 699 additions and 33 deletions

142
docs/backend/RPC-RDMA.md Normal file
View File

@ -0,0 +1,142 @@
# llama.cpp RPC with RDMA Transport
- [Background](#background)
- [OS](#os)
- [Hardware](#hardware)
- [Performance](#performance)
- [Building](#building)
- [Usage](#usage)
- [Environment Variables](#environment-variables)
- [Troubleshooting](#troubleshooting)
## Background
The RPC backend in llama.cpp enables distributed inference by offloading computation to remote hosts over TCP. While TCP works well for most networks, the per-message overhead of the kernel network stack becomes a bottleneck for token generation, where each token requires a full round-trip between nodes.
RDMA (Remote Direct Memory Access) bypasses the kernel network stack entirely, allowing NICs to read and write memory directly. RoCEv2 (RDMA over Converged Ethernet v2) brings RDMA to standard Ethernet networks using commodity NICs from vendors like Mellanox/NVIDIA and Broadcom.
When built with `-DGGML_RPC_RDMA=ON`, the RPC backend auto-negotiates RDMA transport during connection setup. If both client and server have RDMA-capable NICs, the connection upgrades transparently. If either side lacks RDMA, it falls back to TCP silently.
## OS
| OS | Status | Notes |
| ------- | --------- | --------------------------------------------------- |
| Linux | Supported | Requires `rdma-core` / `libibverbs-dev` |
| Windows | N/A | RDMA code not compiled; TCP-only RPC works normally |
| macOS | N/A | RDMA code not compiled; TCP-only RPC works normally |
## Hardware
RDMA transport requires RoCEv2-capable NICs on both nodes. Tested hardware:
| NIC | Link Speed | Status |
| -------------------------------------- | ---------- | --------- |
| Mellanox ConnectX-4 Lx (MT27710) 25GbE | 25 Gbps | Supported |
| Mellanox ConnectX-6 Lx (MT2894) 25GbE | 25 Gbps | Supported |
Other RoCEv2-capable NICs (ConnectX-5/7, Broadcom NetXtreme-E, etc.) should work but are untested. Mixed NIC generations across nodes are supported.
## Performance
Two-node cluster: AMD Radeon 8060S (gfx1151) iGPUs, ConnectX-4 Lx / ConnectX-6 Lx 25GbE, RoCEv2. Model: Qwen3-Coder-Next 80B Q8_K_XL, layer split (`-sm layer -ts 1/1`) across both nodes, ROCm backend.
| Metric | TCP | RDMA | Improvement |
| -------------------------- | ------ | ------ | ----------- |
| Prompt processing (pp2048) | 651.48 | 678.42 | **+4.1%** |
| Token generation (tg256) | 30.19 | 32.16 | **+6.5%** |
Token generation benefits most because each token requires a round-trip between nodes. Results will vary with hardware, model size, and split configuration.
## Building
Build with RDMA support by adding `-DGGML_RPC_RDMA=ON`:
```bash
cmake -B build \
-DCMAKE_BUILD_TYPE=Release \
-DGGML_RPC=ON \
-DGGML_RPC_RDMA=ON
cmake --build build --target rpc-server llama-bench -j$(nproc)
```
### Dependencies
Requires `libibverbs-dev` (part of `rdma-core`):
```bash
# Ubuntu / Debian
sudo apt install libibverbs-dev rdma-core
# Fedora / RHEL
sudo dnf install libibverbs-devel rdma-core-devel
```
This is an optional dependency. Without `-DGGML_RPC_RDMA=ON`, the build produces a standard TCP-only binary with no RDMA code or libibverbs linkage.
## Usage
### Server
```bash
bin/rpc-server -H 0.0.0.0 -p 50052 -c
```
### Client
```bash
bin/llama-bench --rpc 192.168.1.45:50052 \
-m model.gguf -p 2048 -n 256
```
The connection starts as TCP and upgrades to RDMA automatically during the handshake. The server log confirms the upgrade:
```
Accepted client connection
RDMA probed: dev=mlx5_0 gid=5 qpn=328 inline=316
RDMA activated: qpn=328->488 mtu=1024 rx_depth=24
```
When RDMA is not available (no hardware or connecting to a stock `rpc-server`), the connection works over TCP with no user action required.
## Environment Variables
| Variable | Required | Default | Description |
| --------------- | -------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `GGML_RDMA_DEV` | No | auto-detect | RDMA device name (e.g. `mlx5_0`). When set, only this device is considered. When unset, all devices are scanned for a GID matching the TCP socket's local IP. |
| `GGML_RDMA_GID` | No | auto-detect | GID table index. When unset, the first IPv4-mapped RoCEv2 GID is used. |
These variables are only needed when auto-detection fails, typically in complex network topologies such as Linux bridges where the IP may not appear in the expected GID slot.
## Troubleshooting
### Verify RDMA devices
```bash
ibv_devices # list RDMA devices
ibv_devinfo # show device details and port state
```
### Check GID table
```bash
cat /sys/class/infiniband/mlx5_0/ports/1/gids/0
```
GID entries with `fe80::` prefix are link-local (InfiniBand). Look for entries with `::ffff:` prefix -- these are IPv4-mapped RoCEv2 GIDs.
### RDMA not activating
- Ensure both nodes have `rdma-core` installed and RDMA devices visible in `ibv_devices`
- If using a Linux bridge, set `GGML_RDMA_DEV` and `GGML_RDMA_GID` explicitly
- Check that RoCEv2 is enabled on the NIC port
- Enable debug logging with `GGML_RPC_DEBUG=1` to see probe/activate messages

View File

@ -7,7 +7,7 @@ extern "C" {
#endif
#define RPC_PROTO_MAJOR_VERSION 3
#define RPC_PROTO_MINOR_VERSION 6
#define RPC_PROTO_MINOR_VERSION 7
#define RPC_PROTO_PATCH_VERSION 1
#ifdef __cplusplus

View File

@ -7,3 +7,11 @@ ggml_add_backend_library(ggml-rpc
if (WIN32)
target_link_libraries(ggml-rpc PRIVATE ws2_32)
endif()
option(GGML_RPC_RDMA "ggml: enable RDMA transport for RPC (requires libibverbs)" OFF)
if (GGML_RPC_RDMA)
find_library(IBVERBS_LIB ibverbs REQUIRED)
target_compile_definitions(ggml-rpc PRIVATE GGML_RPC_RDMA)
target_link_libraries(ggml-rpc PRIVATE ${IBVERBS_LIB})
message(STATUS " RDMA transport enabled")
endif()

View File

@ -31,6 +31,14 @@
#include <filesystem>
#include <algorithm>
#ifdef GGML_RPC_RDMA
# include <infiniband/verbs.h>
# include <time.h>
# ifndef _WIN32
# include <poll.h>
# endif
#endif // GGML_RPC_RDMA
static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG");
#define LOG_DBG(...) \
@ -41,6 +49,11 @@ namespace fs = std::filesystem;
static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB
#ifdef GGML_RPC_RDMA
static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock)
static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB
#endif // GGML_RPC_RDMA
#ifdef _WIN32
typedef SOCKET sockfd_t;
using ssize_t = __int64;
@ -49,15 +62,74 @@ typedef int sockfd_t;
#endif
// cross-platform socket
#ifdef GGML_RPC_RDMA
struct rdma_conn {
struct ibv_context * ctx = nullptr;
struct ibv_pd * pd = nullptr;
struct ibv_cq * scq = nullptr; // send completions
struct ibv_cq * rcq = nullptr; // recv completions
struct ibv_qp * qp = nullptr;
void * tx_buf = nullptr;
struct ibv_mr * tx_mr = nullptr;
void * rx_buf = nullptr; // RDMA_RX_DEPTH × RDMA_CHUNK contiguous
struct ibv_mr * rx_mr = nullptr;
int rx_head = 0;
uint32_t max_inline = 0;
void * rx_slot(int i) { return (char *)rx_buf + (size_t)i * RDMA_CHUNK; }
bool post_rx(int i) {
struct ibv_sge sge = {};
sge.addr = (uintptr_t)rx_slot(i);
sge.length = RDMA_CHUNK;
sge.lkey = rx_mr->lkey;
struct ibv_recv_wr wr = {}, * bad = nullptr;
wr.wr_id = (uint64_t)i;
wr.sg_list = &sge;
wr.num_sge = 1;
return ibv_post_recv(qp, &wr, &bad) == 0;
}
~rdma_conn() {
if (tx_mr) ibv_dereg_mr(tx_mr);
if (rx_mr) ibv_dereg_mr(rx_mr);
free(tx_buf);
free(rx_buf);
if (qp) ibv_destroy_qp(qp);
if (scq) ibv_destroy_cq(scq);
if (rcq) ibv_destroy_cq(rcq);
if (pd) ibv_dealloc_pd(pd);
if (ctx) ibv_close_device(ctx);
}
};
#endif // GGML_RPC_RDMA
// Forward declarations for transport function pointers
struct socket_t;
static bool tcp_send_impl(socket_t * sock, const void * data, size_t size);
static bool tcp_recv_impl(socket_t * sock, void * data, size_t size);
struct socket_t {
sockfd_t fd;
bool (*fn_send)(socket_t *, const void *, size_t) = tcp_send_impl;
bool (*fn_recv)(socket_t *, void *, size_t) = tcp_recv_impl;
#ifdef GGML_RPC_RDMA
rdma_conn * rdma = nullptr;
#endif // GGML_RPC_RDMA
socket_t(sockfd_t fd) : fd(fd) {}
~socket_t() {
#ifdef GGML_RPC_RDMA
if (rdma) { delete rdma; rdma = nullptr; }
#endif // GGML_RPC_RDMA
LOG_DBG("[%s] closing socket %d\n", __func__, this->fd);
#ifdef _WIN32
closesocket(this->fd);
if (fd != INVALID_SOCKET) closesocket(this->fd);
#else
close(this->fd);
if (fd >= 0) close(this->fd);
#endif
}
};
@ -121,6 +193,25 @@ struct rpc_msg_hello_rsp {
uint8_t patch;
};
struct rpc_msg_hello_rdma_req {
uint32_t rdma_qpn; // 0 = no RDMA capability
uint32_t rdma_psn;
uint8_t rdma_gid[16];
};
struct rpc_msg_hello_rdma_rsp {
uint8_t major;
uint8_t minor;
uint8_t patch;
uint8_t padding;
uint32_t rdma_qpn; // 0 = no RDMA capability
uint32_t rdma_psn;
uint8_t rdma_gid[16];
};
static_assert(sizeof(rpc_msg_hello_rdma_req) == 24, "rpc_msg_hello_rdma_req must be 24 bytes");
static_assert(sizeof(rpc_msg_hello_rdma_rsp) == 28, "rpc_msg_hello_rdma_rsp must be 28 bytes");
struct rpc_msg_device_count_rsp {
uint32_t device_count;
};
@ -414,27 +505,337 @@ static bool recv_data(sockfd_t sockfd, void * data, size_t size) {
return true;
}
static bool send_msg(sockfd_t sockfd, const void * msg, size_t msg_size) {
if (!send_data(sockfd, &msg_size, sizeof(msg_size))) {
return false;
}
return send_data(sockfd, msg, msg_size);
// TCP transport implementations (for function-pointer dispatch)
static bool tcp_send_impl(socket_t * sock, const void * data, size_t size) {
return send_data(sock->fd, data, size);
}
static bool recv_msg(sockfd_t sockfd, void * msg, size_t msg_size) {
static bool tcp_recv_impl(socket_t * sock, void * data, size_t size) {
return recv_data(sock->fd, data, size);
}
// RDMA transport (performance-optimized, auto-negotiated)
#ifdef GGML_RPC_RDMA
static bool rdma_send_impl(socket_t * sock, const void * data, size_t size);
static bool rdma_recv_impl(socket_t * sock, void * data, size_t size);
static inline bool tcp_peer_closed(int fd) {
if (fd < 0) return false;
#ifndef _WIN32
struct pollfd pfd = { fd, POLLIN | POLLRDHUP, 0 };
int r = poll(&pfd, 1, 0);
return r > 0 && (pfd.revents & (POLLHUP | POLLERR | POLLRDHUP));
#else
return false;
#endif
}
static inline bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc, int tcp_fd = -1) {
struct timespec t0;
clock_gettime(CLOCK_MONOTONIC_COARSE, &t0);
for (uint64_t s = 0; ; s++) {
int n = ibv_poll_cq(cq, 1, wc);
if (n > 0) {
if (wc->status != IBV_WC_SUCCESS) {
GGML_LOG_ERROR("RDMA CQ wc error: status=%d (%s) vendor_err=0x%x\n",
wc->status, ibv_wc_status_str(wc->status), wc->vendor_err);
}
return wc->status == IBV_WC_SUCCESS;
}
if (n < 0) return false;
if ((s & 0xFFFFF) == 0 && s > 0) {
if (tcp_fd >= 0 && tcp_peer_closed(tcp_fd)) {
return false;
}
struct timespec now;
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
if (now.tv_sec - t0.tv_sec >= 30) {
GGML_LOG_ERROR("RDMA CQ poll timeout\n");
return false;
}
}
}
}
static bool rdma_send(rdma_conn * c, const void * data, size_t size, int tcp_fd = -1) {
const uint8_t * src = (const uint8_t *)data;
size_t rem = size;
while (rem > 0) {
size_t chunk = std::min(rem, RDMA_CHUNK);
struct ibv_sge sge = {};
struct ibv_send_wr wr = {}, * bad = nullptr;
wr.opcode = IBV_WR_SEND;
wr.sg_list = &sge;
wr.num_sge = 1;
if (chunk <= c->max_inline) {
sge.addr = (uintptr_t)src;
sge.length = chunk;
wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_INLINE;
} else {
memcpy(c->tx_buf, src, chunk);
sge.addr = (uintptr_t)c->tx_buf;
sge.length = chunk;
sge.lkey = c->tx_mr->lkey;
wr.send_flags = IBV_SEND_SIGNALED;
}
if (ibv_post_send(c->qp, &wr, &bad) != 0) return false;
struct ibv_wc wc;
if (!rdma_poll(c->scq, &wc, tcp_fd)) return false;
src += chunk;
rem -= chunk;
}
return true;
}
static bool rdma_recv(rdma_conn * c, void * data, size_t size, int tcp_fd = -1) {
uint8_t * dst = (uint8_t *)data;
size_t rem = size;
while (rem > 0) {
struct ibv_wc wc;
if (!rdma_poll(c->rcq, &wc, tcp_fd)) return false;
int slot = (int)wc.wr_id;
size_t got = wc.byte_len;
memcpy(dst, c->rx_slot(slot), got);
if (!c->post_rx(slot)) return false;
dst += got;
rem -= got;
}
return true;
}
static bool rdma_send_impl(socket_t * sock, const void * data, size_t size) {
return rdma_send(sock->rdma, data, size, sock->fd);
}
static bool rdma_recv_impl(socket_t * sock, void * data, size_t size) {
return rdma_recv(sock->rdma, data, size, sock->fd);
}
// Phase 1: Probe for RDMA device, create QP (in RESET state), return local info.
// Returns rdma_conn with QP created but NOT connected. Caller gets local QPN/PSN/GID
// to send to the remote side via the HELLO exchange.
// If RDMA is not available, returns nullptr (caller stays on TCP).
struct rdma_local_info {
uint32_t qpn;
uint32_t psn;
uint8_t gid[16];
uint8_t ib_port;
int gid_idx;
enum ibv_mtu path_mtu;
};
static rdma_conn * rdma_probe(sockfd_t tcp_fd, rdma_local_info * out) {
const char * dev_env = std::getenv("GGML_RDMA_DEV");
const char * gid_env = std::getenv("GGML_RDMA_GID");
struct sockaddr_in local_addr;
socklen_t addr_len = sizeof(local_addr);
if (getsockname(tcp_fd, (struct sockaddr *)&local_addr, &addr_len) != 0) {
return nullptr;
}
const uint8_t ib_port = 1;
int num_devs = 0;
struct ibv_device ** devs = ibv_get_device_list(&num_devs);
if (!devs || num_devs == 0) return nullptr;
struct ibv_context * ibctx = nullptr;
const char * matched_dev = nullptr;
int gid_idx = gid_env ? atoi(gid_env) : -1;
for (int d = 0; d < num_devs; d++) {
const char * dn = ibv_get_device_name(devs[d]);
if (dev_env && strcmp(dev_env, dn) != 0) continue;
struct ibv_context * ctx = ibv_open_device(devs[d]);
if (!ctx) continue;
struct ibv_port_attr pa;
if (ibv_query_port(ctx, ib_port, &pa) != 0) { ibv_close_device(ctx); continue; }
int found_gid = gid_idx;
if (found_gid < 0) {
for (int i = 0; i < pa.gid_tbl_len; i++) {
union ibv_gid g;
if (ibv_query_gid(ctx, ib_port, i, &g) != 0) continue;
if (g.raw[10] != 0xff || g.raw[11] != 0xff) continue;
uint32_t ip;
memcpy(&ip, &g.raw[12], 4);
if (dev_env) {
if (ip != 0) { found_gid = i; break; }
} else {
if (ip == local_addr.sin_addr.s_addr) { found_gid = i; break; }
}
}
}
if (found_gid >= 0) {
ibctx = ctx;
gid_idx = found_gid;
matched_dev = dn;
out->path_mtu = pa.active_mtu;
break;
}
ibv_close_device(ctx);
}
ibv_free_device_list(devs);
if (!ibctx) return nullptr;
out->ib_port = ib_port;
out->gid_idx = gid_idx;
auto * c = new rdma_conn();
c->ctx = ibctx;
c->pd = ibv_alloc_pd(ibctx);
if (!c->pd) { delete c; return nullptr; }
c->scq = ibv_create_cq(ibctx, 16, nullptr, nullptr, 0);
c->rcq = ibv_create_cq(ibctx, RDMA_RX_DEPTH + 4, nullptr, nullptr, 0);
if (!c->scq || !c->rcq) { delete c; return nullptr; }
struct ibv_qp_init_attr qia = {};
qia.send_cq = c->scq;
qia.recv_cq = c->rcq;
qia.qp_type = IBV_QPT_RC;
qia.cap.max_send_wr = 4;
qia.cap.max_recv_wr = RDMA_RX_DEPTH + 4;
qia.cap.max_send_sge = 1;
qia.cap.max_recv_sge = 1;
qia.cap.max_inline_data = 256;
c->qp = ibv_create_qp(c->pd, &qia);
if (!c->qp) { delete c; return nullptr; }
c->max_inline = qia.cap.max_inline_data;
c->tx_buf = aligned_alloc(4096, RDMA_CHUNK);
c->rx_buf = aligned_alloc(4096, (size_t)RDMA_RX_DEPTH * RDMA_CHUNK);
if (!c->tx_buf || !c->rx_buf) { delete c; return nullptr; }
c->tx_mr = ibv_reg_mr(c->pd, c->tx_buf, RDMA_CHUNK, IBV_ACCESS_LOCAL_WRITE);
c->rx_mr = ibv_reg_mr(c->pd, c->rx_buf, (size_t)RDMA_RX_DEPTH * RDMA_CHUNK,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!c->tx_mr || !c->rx_mr) { delete c; return nullptr; }
union ibv_gid local_gid;
ibv_query_gid(ibctx, ib_port, gid_idx, &local_gid);
out->qpn = c->qp->qp_num;
out->psn = c->qp->qp_num & 0xffffff;
memcpy(out->gid, &local_gid, 16);
GGML_LOG_INFO("RDMA probed: dev=%s gid=%d qpn=%u inline=%u\n",
matched_dev, gid_idx, out->qpn, c->max_inline);
return c;
}
// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET->INIT->pre-post->RTR->RTS.
// On success, the connection is live and ready for rdma_send/rdma_recv.
static bool rdma_activate(rdma_conn * c, const rdma_local_info * local,
uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) {
// RESET -> INIT
{
struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_INIT;
a.port_num = local->ib_port;
a.pkey_index = 0;
a.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE;
if (ibv_modify_qp(c->qp, &a,
IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) {
return false;
}
}
for (int i = 0; i < RDMA_RX_DEPTH; i++) {
if (!c->post_rx(i)) return false;
}
// INIT -> RTR
{
struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_RTR;
a.path_mtu = local->path_mtu;
a.dest_qp_num = remote_qpn;
a.rq_psn = remote_psn;
a.max_dest_rd_atomic = 1;
a.min_rnr_timer = 1;
a.ah_attr.is_global = 1;
memcpy(&a.ah_attr.grh.dgid, remote_gid, 16);
a.ah_attr.grh.hop_limit = 1;
a.ah_attr.grh.sgid_index = local->gid_idx;
a.ah_attr.dlid = 0;
a.ah_attr.port_num = local->ib_port;
if (ibv_modify_qp(c->qp, &a,
IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER) != 0) {
return false;
}
}
// RTR -> RTS
{
struct ibv_qp_attr a = {};
a.qp_state = IBV_QPS_RTS;
a.timeout = 14;
a.retry_cnt = 7;
a.rnr_retry = 7;
a.sq_psn = local->psn;
a.max_rd_atomic = 1;
if (ibv_modify_qp(c->qp, &a,
IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC) != 0) {
return false;
}
}
GGML_LOG_INFO("RDMA activated: qpn=%u->%u mtu=%d rx_depth=%d\n",
local->qpn, remote_qpn, 128 << local->path_mtu, RDMA_RX_DEPTH);
return true;
}
#endif // GGML_RPC_RDMA
// unified transport dispatch (via function pointers)
static bool send_data(socket_t * sock, const void * data, size_t size) {
return sock->fn_send(sock, data, size);
}
static bool recv_data(socket_t * sock, void * data, size_t size) {
return sock->fn_recv(sock, data, size);
}
static bool send_msg(socket_t * sock, const void * msg, size_t msg_size) {
if (!send_data(sock, &msg_size, sizeof(msg_size))) {
return false;
}
return send_data(sock, msg, msg_size);
}
static bool recv_msg(socket_t * sock, void * msg, size_t msg_size) {
uint64_t size;
if (!recv_data(sockfd, &size, sizeof(size))) {
if (!recv_data(sock, &size, sizeof(size))) {
return false;
}
if (size != msg_size) {
return false;
}
return recv_data(sockfd, msg, msg_size);
return recv_data(sock, msg, msg_size);
}
static bool recv_msg(sockfd_t sockfd, std::vector<uint8_t> & input) {
static bool recv_msg(socket_t * sock, std::vector<uint8_t> & input) {
uint64_t size;
if (!recv_data(sockfd, &size, sizeof(size))) {
if (!recv_data(sock, &size, sizeof(size))) {
return false;
}
try {
@ -443,7 +844,7 @@ static bool recv_msg(sockfd_t sockfd, std::vector<uint8_t> & input) {
GGML_LOG_ERROR("Failed to allocate input buffer of size %" PRIu64 "\n", size);
return false;
}
return recv_data(sockfd, input.data(), size);
return recv_data(sock, input.data(), size);
}
static bool parse_endpoint(const std::string & endpoint, std::string & host, int & port) {
@ -452,7 +853,11 @@ static bool parse_endpoint(const std::string & endpoint, std::string & host, int
return false;
}
host = endpoint.substr(0, pos);
port = std::stoi(endpoint.substr(pos + 1));
try {
port = std::stoi(endpoint.substr(pos + 1));
} catch (...) {
return false;
}
return true;
}
@ -460,13 +865,13 @@ static bool parse_endpoint(const std::string & endpoint, std::string & host, int
// No response
static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cmd, const void * input, size_t input_size) {
uint8_t cmd_byte = cmd;
if (!send_data(sock->fd, &cmd_byte, sizeof(cmd_byte))) {
if (!send_data(sock.get(), &cmd_byte, sizeof(cmd_byte))) {
return false;
}
if (!send_data(sock->fd, &input_size, sizeof(input_size))) {
if (!send_data(sock.get(), &input_size, sizeof(input_size))) {
return false;
}
if (!send_data(sock->fd, input, input_size)) {
if (!send_data(sock.get(), input, input_size)) {
return false;
}
return true;
@ -478,16 +883,14 @@ static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cm
if (!send_rpc_cmd(sock, cmd, input, input_size)) {
return false;
}
// TODO: currently the output_size is always known, do we need support for commands with variable output size?
// even if we do, we can skip sending output_size from the server for commands with known output size
uint64_t out_size;
if (!recv_data(sock->fd, &out_size, sizeof(out_size))) {
if (!recv_data(sock.get(), &out_size, sizeof(out_size))) {
return false;
}
if (out_size != output_size) {
return false;
}
if (!recv_data(sock->fd, output, output_size)) {
if (!recv_data(sock.get(), output, output_size)) {
return false;
}
return true;
@ -495,7 +898,62 @@ static bool send_rpc_cmd(const std::shared_ptr<socket_t> & sock, enum rpc_cmd cm
// RPC client-side implementation
static bool check_server_version(const std::shared_ptr<socket_t> & sock) {
// Performs HELLO handshake with optional RDMA auto-negotiation.
// If both sides have RDMA, the socket is upgraded transparently.
static bool negotiate_hello(const std::shared_ptr<socket_t> & sock) {
#ifdef GGML_RPC_RDMA
rdma_local_info local_info = {};
std::unique_ptr<rdma_conn> probe(rdma_probe(sock->fd, &local_info));
if (probe) {
rpc_msg_hello_rdma_req req = {};
req.rdma_qpn = local_info.qpn;
req.rdma_psn = local_info.psn;
memcpy(req.rdma_gid, local_info.gid, 16);
if (!send_rpc_cmd(sock, RPC_CMD_HELLO, &req, sizeof(req))) {
return false;
}
uint64_t out_size = 0;
if (!recv_data(sock.get(), &out_size, sizeof(out_size))) {
return false;
}
if (out_size == sizeof(rpc_msg_hello_rdma_rsp)) {
rpc_msg_hello_rdma_rsp rsp = {};
if (!recv_data(sock.get(), &rsp, sizeof(rsp))) {
return false;
}
if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) {
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch);
return false;
}
if (rsp.rdma_qpn != 0) {
if (rdma_activate(probe.get(), &local_info, rsp.rdma_qpn, rsp.rdma_psn, rsp.rdma_gid)) {
sock->rdma = probe.release();
sock->fn_send = rdma_send_impl;
sock->fn_recv = rdma_recv_impl;
return true;
}
GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n");
}
return true;
} else if (out_size == sizeof(rpc_msg_hello_rsp)) {
rpc_msg_hello_rsp rsp = {};
if (!recv_data(sock.get(), &rsp, sizeof(rsp))) {
return false;
}
if (rsp.major != RPC_PROTO_MAJOR_VERSION || rsp.minor > RPC_PROTO_MINOR_VERSION) {
GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", rsp.major, rsp.minor, rsp.patch);
return false;
}
return true;
} else {
return false;
}
}
#endif // GGML_RPC_RDMA
rpc_msg_hello_rsp response;
bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
RPC_STATUS_ASSERT(status);
@ -527,6 +985,7 @@ static std::shared_ptr<socket_t> get_socket(const std::string & endpoint) {
GGML_LOG_ERROR("Failed to parse endpoint: %s\n", endpoint.c_str());
return nullptr;
}
#ifdef _WIN32
if (!initialized) {
WSADATA wsaData;
@ -543,10 +1002,10 @@ static std::shared_ptr<socket_t> get_socket(const std::string & endpoint) {
if (sock == nullptr) {
return nullptr;
}
if (!check_server_version(sock)) {
if (!negotiate_hello(sock)) {
return nullptr;
}
LOG_DBG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
LOG_DBG("[%s] connected to %s\n", __func__, endpoint.c_str());
sockets[endpoint] = sock;
return sock;
}
@ -1579,24 +2038,69 @@ rpc_server::~rpc_server() {
}
static void rpc_serve_client(const std::vector<ggml_backend_t> & backends, const char * cache_dir,
sockfd_t sockfd) {
socket_t * sockfd) {
rpc_server server(backends, cache_dir);
uint8_t cmd;
if (!recv_data(sockfd, &cmd, 1)) {
return;
}
// the first command sent by the client must be HELLO
if (cmd != RPC_CMD_HELLO) {
GGML_LOG_ERROR("Expected HELLO command, update client\n");
return;
}
if (!recv_msg(sockfd, nullptr, 0)) {
// Read input_size to determine legacy vs extended HELLO
uint64_t hello_input_size;
if (!recv_data(sockfd, &hello_input_size, sizeof(hello_input_size))) {
return;
}
rpc_msg_hello_rsp response;
server.hello(response);
if (!send_msg(sockfd, &response, sizeof(response))) {
return;
if (hello_input_size == sizeof(rpc_msg_hello_rdma_req)) {
rpc_msg_hello_rdma_req req = {};
if (!recv_data(sockfd, &req, sizeof(req))) {
return;
}
rpc_msg_hello_rdma_rsp rsp = {};
{ rpc_msg_hello_rsp v; server.hello(v); rsp.major = v.major; rsp.minor = v.minor; rsp.patch = v.patch; }
#ifdef GGML_RPC_RDMA
rdma_local_info local_info = {};
std::unique_ptr<rdma_conn> probe(rdma_probe(sockfd->fd, &local_info));
if (probe && req.rdma_qpn != 0) {
rsp.rdma_qpn = local_info.qpn;
rsp.rdma_psn = local_info.psn;
memcpy(rsp.rdma_gid, local_info.gid, 16);
}
#endif // GGML_RPC_RDMA
if (!send_msg(sockfd, &rsp, sizeof(rsp))) {
return;
}
#ifdef GGML_RPC_RDMA
if (probe && rsp.rdma_qpn != 0) {
if (rdma_activate(probe.get(), &local_info, req.rdma_qpn, req.rdma_psn, req.rdma_gid)) {
sockfd->rdma = probe.release();
sockfd->fn_send = rdma_send_impl;
sockfd->fn_recv = rdma_recv_impl;
} else {
GGML_LOG_ERROR("RDMA activate failed on server, staying on TCP\n");
}
}
#endif // GGML_RPC_RDMA
} else {
if (hello_input_size > 0) {
std::vector<uint8_t> discard(hello_input_size);
if (!recv_data(sockfd, discard.data(), hello_input_size)) {
return;
}
}
rpc_msg_hello_rsp response;
server.hello(response);
if (!send_msg(sockfd, &response, sizeof(response))) {
return;
}
}
while (true) {
if (!recv_data(sockfd, &cmd, 1)) {
@ -1866,6 +2370,12 @@ void ggml_backend_rpc_start_server(const char * endpoint, const char * cache_dir
if (!parse_endpoint(endpoint, host, port)) {
return;
}
#ifdef GGML_RPC_RDMA
printf(" transport : TCP (RDMA auto-negotiate enabled)\n");
#else
printf(" transport : TCP\n");
#endif // GGML_RPC_RDMA
#ifdef _WIN32
{
WSADATA wsaData;
@ -1889,7 +2399,7 @@ void ggml_backend_rpc_start_server(const char * endpoint, const char * cache_dir
}
printf("Accepted client connection\n");
fflush(stdout);
rpc_serve_client(backends, cache_dir, client_socket->fd);
rpc_serve_client(backends, cache_dir, client_socket.get());
printf("Client connection closed\n");
fflush(stdout);
}

View File

@ -95,6 +95,12 @@ $ bin/rpc-server -c
By default, the cache is stored in the `$HOME/.cache/llama.cpp/rpc` directory and can be controlled via the `LLAMA_CACHE` environment variable.
### RDMA transport
On Linux systems with RoCEv2-capable NICs (e.g. Mellanox ConnectX), the RPC backend can use RDMA instead of TCP for lower latency and higher throughput. Build with `-DGGML_RPC_RDMA=ON` and the transport is negotiated automatically -- no changes to command-line usage are required.
See [docs/backend/RPC-RDMA.md](/docs/backend/RPC-RDMA.md) for setup, hardware requirements, and configuration details.
### Troubleshooting
Use the `GGML_RPC_DEBUG` environment variable to enable debug messages from `rpc-server`: