diff --git a/src/config.c b/src/config.c index 2b2a42e4ecc..b01c792a97d 100644 --- a/src/config.c +++ b/src/config.c @@ -2958,6 +2958,66 @@ static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count); } +static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) { + UNUSED(config); + + return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count); +} + +static sds getConfigRdmaBindOption(standardConfig *config) { + UNUSED(config); + return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " "); +} + +static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { + UNUSED(config); + + if (server.rdma_ctx_config.bindaddr_count) { + rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr, + server.rdma_ctx_config.bindaddr_count); + } +} + +static int applyRdmaBind(const char **err) { + connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA); + + if (!rdma_listener) { + *err = "No RDMA building support."; + return 0; + } + + rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr; + rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + rdma_listener->port = server.rdma_ctx_config.port; + rdma_listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(rdma_listener) == C_ERR) { + *err = "Failed to bind to specified addresses for RDMA."; + return 0; + } + + return 1; +} + +static int updateRdmaPort(const char **err) { + connListener *listener = listenerByType(CONN_TYPE_RDMA); + + if (listener != NULL) { + *err = "No RDMA building support."; + return 0; + } + + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(listener) == C_ERR) { + *err = "Unable to listen on this port for RDMA. Check server logs."; + return 0; + } + + return 1; +} + static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); @@ -3251,6 +3311,9 @@ standardConfig static_configs[] = { createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), + createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-comp-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.comp_vector, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), @@ -3331,6 +3394,7 @@ standardConfig static_configs[] = { createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj), createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL), createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind), + createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind), createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL), createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL), diff --git a/src/connection.h b/src/connection.h index 099bbc558da..76b5298e9a6 100644 --- a/src/connection.h +++ b/src/connection.h @@ -60,6 +60,7 @@ typedef enum { #define CONN_TYPE_SOCKET "tcp" #define CONN_TYPE_UNIX "unix" #define CONN_TYPE_TLS "tls" +#define CONN_TYPE_RDMA "rdma" #define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); diff --git a/src/rdma.c b/src/rdma.c index 8f8b0df8b4f..d3c6f0acfb6 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -128,12 +128,10 @@ typedef struct rdma_listener { static list *pending_list; static rdma_listener *rdma_listeners; +static serverRdmaContextConfig *rdma_config; static ConnectionType CT_RDMA; -static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; -static int valkey_rdma_comp_vector = -1; /* -1 means a random one */ - static void serverRdmaError(char *err, const char *fmt, ...) { va_list ap; @@ -272,7 +270,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) { /* setup recv buf & MR */ access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; - length = valkey_rdma_rx_size; + length = rdma_config->rx_size; ctx->rx.addr = page_aligned_zalloc(length); ctx->rx.length = length; ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access); @@ -295,6 +293,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { struct ibv_comp_channel *comp_channel = NULL; struct ibv_cq *cq = NULL; struct ibv_pd *pd = NULL; + int comp_vector = rdma_config->comp_vector; if (ibv_query_device(cm_id->verbs, &device_attr)) { serverLog(LL_WARNING, "RDMA: ibv ibv query device failed"); @@ -317,8 +316,13 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { ctx->comp_channel = comp_channel; + /* negative number means a random one */ + if (comp_vector < 0) { + comp_vector = abs((int)random()); + } + cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, NULL, comp_channel, - valkey_rdma_comp_vector % cm_id->verbs->num_comp_vectors); + comp_vector % cm_id->verbs->num_comp_vectors); if (!cq) { serverLog(LL_WARNING, "RDMA: ibv create cq failed"); return C_ERR; @@ -1610,6 +1614,7 @@ int connRdmaListen(connListener *listener) { rdma_listener++; } + rdma_config = listener->priv; return C_OK; } @@ -1628,6 +1633,7 @@ static void connRdmaCloseListener(connListener *listener) { listener->count = 0; zfree(rdma_listeners); rdma_listeners = NULL; + rdma_config = NULL; } static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { @@ -1787,17 +1793,6 @@ static ConnectionType CT_RDMA = { .process_pending_data = rdmaProcessPendingData, }; -static struct connListener *rdmaListener(void) { - static struct connListener *listener = NULL; - - if (listener) return listener; - - listener = listenerByType(CONN_TYPE_RDMA); - serverAssert(listener != NULL); - - return listener; -} - ConnectionType *connectionTypeRdma(void) { static ConnectionType *ct_rdma = NULL; @@ -1809,117 +1804,15 @@ ConnectionType *connectionTypeRdma(void) { return ct_rdma; } -/* rdma listener has different create/close logic from TCP, we can't re-use 'int changeListener(connListener *listener)' - * directly */ -static int rdmaChangeListener(void) { - struct connListener *listener = rdmaListener(); - - connRdmaCloseListener(listener); - /* Just close the server if port disabled */ - if (listener->port == 0) { - if (server.set_proc_title) serverSetProcTitle(NULL); - return VALKEYMODULE_OK; - } - - /* Re-create listener */ - if (connListen(listener) != C_OK) { - return VALKEYMODULE_ERR; - } - - /* Create event handlers */ - if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) { - serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL)); - } - - if (server.set_proc_title) serverSetProcTitle(NULL); - - return VALKEYMODULE_OK; -} - #ifdef BUILD_RDMA_MODULE #include "release.h" -static long long rdmaGetPort(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); - struct connListener *listener = rdmaListener(); - - return listener->port; -} - -static int rdmaSetPort(const char *name, long long val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(privdata); - UNUSED(err); - struct connListener *listener = rdmaListener(); - listener->port = val; - - return VALKEYMODULE_OK; -} - -static ValkeyModuleString *rdma_bind; - -static void rdmaBuildBind(void *ctx) { - struct connListener *listener = rdmaListener(); - - if (rdma_bind) ValkeyModule_FreeString(NULL, rdma_bind); - - sds rdma_bind_str = sdsjoin(listener->bindaddr, listener->bindaddr_count, " "); - rdma_bind = ValkeyModule_CreateString(ctx, rdma_bind_str, sdslen(rdma_bind_str)); -} - -static ValkeyModuleString *rdmaGetBind(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); - - return rdma_bind; -} - -static int rdmaSetBind(const char *name, ValkeyModuleString *val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(err); - struct connListener *listener = rdmaListener(); - const char *bind = ValkeyModule_StringPtrLen(val, NULL); - int nexts; - sds *exts = sdssplitlen(bind, strlen(bind), " ", 1, &nexts); - - if (nexts > CONFIG_BINDADDR_MAX) { - serverLog(LL_WARNING, "RDMA: Unsupported bind ( > %d)", CONFIG_BINDADDR_MAX); - return VALKEYMODULE_ERR; - } - - /* Free old bind addresses */ - for (int j = 0; j < listener->bindaddr_count; j++) { - zfree(listener->bindaddr[j]); - } - - for (int j = 0; j < nexts; j++) listener->bindaddr[j] = zstrdup(exts[j]); - listener->bindaddr_count = nexts; - - sdsfreesplitres(exts, nexts); - rdmaBuildBind(privdata); - - return VALKEYMODULE_OK; -} - -static int rdmaApplyListener(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err) { - UNUSED(ctx); - UNUSED(privdata); - UNUSED(err); - - return rdmaChangeListener(); -} - -static void rdmaListenerAddConfig(void *ctx) { - serverAssert(ValkeyModule_RegisterNumericConfig(ctx, "port", 0, VALKEYMODULE_CONFIG_DEFAULT, 0, 65535, rdmaGetPort, - rdmaSetPort, rdmaApplyListener, NULL) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_RegisterStringConfig(ctx, "bind", "", VALKEYMODULE_CONFIG_DEFAULT, rdmaGetBind, - rdmaSetBind, rdmaApplyListener, ctx) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_LoadConfigs(ctx) == VALKEYMODULE_OK); -} int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + /* Connection modules MUST be part of the same build as valkey. */ if (strcmp(REDIS_BUILD_ID_RAW, serverBuildIdRaw())) { serverLog(LL_NOTICE, "Connection type %s was not built together with the valkey-server used.", CONN_TYPE_RDMA); @@ -1938,40 +1831,6 @@ int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { if (connTypeRegister(&CT_RDMA) != C_OK) return VALKEYMODULE_ERR; - rdmaListenerAddConfig(ctx); - - struct connListener *listener = rdmaListener(); - listener->ct = connectionTypeRdma(); - listener->bindaddr = zcalloc_num(CONFIG_BINDADDR_MAX, sizeof(listener->bindaddr[0])); - - for (int i = 0; i < argc; i++) { - robj *str = (robj *)argv[i]; - int nexts; - sds *exts = sdssplitlen(str->ptr, strlen(str->ptr), "=", 1, &nexts); - if (nexts != 2) { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - if (!strcasecmp(exts[0], "bind")) { - listener->bindaddr[listener->bindaddr_count++] = zstrdup(exts[1]); - } else if (!strcasecmp(exts[0], "port")) { - listener->port = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "rx-size")) { - valkey_rdma_rx_size = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "comp-vector")) { - valkey_rdma_comp_vector = atoi(exts[1]); - } else { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - sdsfreesplitres(exts, nexts); - } - - rdmaBuildBind(ctx); - if (valkey_rdma_comp_vector == -1) valkey_rdma_comp_vector = abs((int)random()); - return VALKEYMODULE_OK; } diff --git a/src/server.c b/src/server.c index 7a472de65f5..b519501ca3a 100644 --- a/src/server.c +++ b/src/server.c @@ -2884,6 +2884,17 @@ void initListeners(void) { listener->priv = &server.unix_ctx_config; /* Unix socket specified */ } + if (server.rdma_ctx_config.port != 0) { + conn_index = connectionIndexByType(CONN_TYPE_RDMA); + if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_RDMA); + listener = &server.listeners[conn_index]; + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + listener->priv = &server.rdma_ctx_config; + } + /* create all the configured listener, and add handler to start to accept */ int listen_fds = 0; for (int j = 0; j < CONN_TYPE_MAX; j++) { diff --git a/src/server.h b/src/server.h index 6a960b22e18..197ad3b46c5 100644 --- a/src/server.h +++ b/src/server.h @@ -1612,6 +1612,17 @@ typedef struct serverUnixContextConfig { unsigned int perm; /* UNIX socket permission (see mode_t) */ } serverUnixContextConfig; +/*----------------------------------------------------------------------------- + * RDMA Context Configuration + *----------------------------------------------------------------------------*/ +typedef struct serverRdmaContextConfig { + char *bindaddr[CONFIG_BINDADDR_MAX]; + int bindaddr_count; + int port; + int rx_size; + int comp_vector; +} serverRdmaContextConfig; + /*----------------------------------------------------------------------------- * AOF manifest definition *----------------------------------------------------------------------------*/ @@ -2225,6 +2236,7 @@ struct valkeyServer { int tls_auth_clients; serverTLSContextConfig tls_ctx_config; serverUnixContextConfig unix_ctx_config; + serverRdmaContextConfig rdma_ctx_config; /* cpu affinity */ char *server_cpulist; /* cpu affinity list of server main/io thread. */ char *bio_cpulist; /* cpu affinity list of bio thread. */ diff --git a/tests/rdma/run.py b/tests/rdma/run.py index 0724c27adca..09168f368ae 100755 --- a/tests/rdma/run.py +++ b/tests/rdma/run.py @@ -63,7 +63,7 @@ def test_rdma(ipaddr): rdmapath = valkeydir + "/src/valkey-rdma.so" svrcmd = [svrpath, "--port", "0", "--loglevel", "verbose", "--protected-mode", "yes", "--appendonly", "no", "--daemonize", "no", "--dir", valkeydir + "/tests/rdma/tmp", - "--loadmodule", rdmapath, "port=6379", "bind=" + ipaddr] + "--loadmodule", rdmapath, "--rdma-port", "6379", "--rdma-bind", ipaddr] svr = subprocess.Popen(svrcmd, shell=False, stdout=subprocess.PIPE) try: diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 352f5f183e1..5a9ddc00f4b 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,6 +558,10 @@ start_server {tags {"introspection"}} { req-res-logfile client-default-resp dual-channel-replication-enabled + rdma-comp-vector + rdma-rx-size + rdma-bind + rdma-port } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index bf82b01874c..747ef4fc124 100644 --- a/valkey.conf +++ b/valkey.conf @@ -300,6 +300,51 @@ tcp-keepalive 300 # # tls-session-cache-timeout 60 +################################### RDMA ###################################### + +# By default, RDMA is disabled. To enable it, the "rdma-port" configuration +# directive can be used to define RDMA-listening ports. +# +# rdma-port 6379 +# rdma-bind 192.168.1.100 + +# The RDMA receive transfer buffer is 1M by default. It can be set between 64K and 16M. +# Note that page size aligned size is preferred. +# +# rdma-rx-size 1048576 + +# The RDMA completion queue will use the completion vector to signal completion events +# via hardware interrupts. A large number of hardware interrupts can affect CPU performance. +# It is possible to tune the performance using rdma-comp-vector. +# +# Example 1. a) Pin hardware interrupt vectors [0, 3] to CPU [0, 3]. +# b) Set CPU affinity for valkey to CPU [4, X]. +# c) Any valkey server uses a random RDMA completion vector. +# All valkey servers will not affect each other and will be isolated from kernel interrupts. +# +# SYS SYS SYS SYS VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | +# INTR0 INTR1 INTR2 INTR3 +# +# Example 2. a) Pin hardware interrupt vectors [0, X] to CPU [0, X]. +# b) Set CPU affinity for valkey to CPU [0, X]. +# c) Valkey server [M] uses RDMA completion vector [M]. +# A single CPU handles hardware interrupts, the RDMA completion queue, and the valkey server. +# This avoids overhead and function calls across multiple CPUs. +# +# VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | | | | +# INTR0 INTR1 INTR2 INTR3 INTR4 INTR5 INTRX +# +# Use 0 and positive numbers to specify the RDMA completion vector, or specify -1 to allow +# the server to use a random vector for a new connection. The default vector is -1. +# +# rdma-comp-vector 0 + ################################# GENERAL ##################################### # By default the server does not run as a daemon. Use 'yes' if you need it.