Skip to content

Commit

Permalink
DAOS-15800 client: create cart context on specific interface (#14804)
Browse files Browse the repository at this point in the history
Cart has added the ability to select network interface on context creation. The daos_agent also added a numa-fabric map that can be queried at init time. Update the DAOS client to query from the agent a map of numa to network interface on daos_init(), and on EQ creation, select the best interface for the network context based on the numa of the calling thread.

Signed-off-by: Mohamad Chaarawi <[email protected]>
  • Loading branch information
mchaarawi authored Aug 30, 2024
1 parent 1b5943c commit 9662e98
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/cart/README.env
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ This file lists the environment variables used in CaRT.

. CRT_CTX_NUM
If set, specifies the limit of number of allowed CaRT contexts to be created.
Valid range is [1, 64], with default being 64 if unset.
Valid range is [1, 128], with default being 128 if unset.

. D_FI_CONFIG
Specifies the fault injection configuration file. If this variable is not set
Expand Down
2 changes: 1 addition & 1 deletion src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#define CRT_CONTEXT_NULL (NULL)

#ifndef CRT_SRV_CONTEXT_NUM
#define CRT_SRV_CONTEXT_NUM (64) /* Maximum number of contexts */
#define CRT_SRV_CONTEXT_NUM (128) /* Maximum number of contexts */
#endif


Expand Down
2 changes: 1 addition & 1 deletion src/client/api/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scons():

if prereqs.client_requested():
libdaos = env.d_library('daos', libdaos_tgts, SHLIBVERSION=API_VERSION,
LIBS=['daos_common'])
LIBS=['daos_common', 'numa'])
if hasattr(env, 'InstallVersionedLib'):
env.InstallVersionedLib('$PREFIX/lib64/', libdaos, SHLIBVERSION=API_VERSION)
else:
Expand Down
39 changes: 34 additions & 5 deletions src/client/api/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,24 @@ daos_eq_lib_init(crt_init_options_t *crt_info)
D_GOTO(unlock, rc);
}

/* use a global shared context for all eq for now */
rc = crt_context_create(&daos_eq_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc && rc != -DER_NONEXIST) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&daos_eq_ctx);
else
rc = crt_context_create_on_iface(iface, &daos_eq_ctx);
} else {
rc = crt_context_create(&daos_eq_ctx);
}
if (rc != 0) {
D_ERROR("failed to create client context: "DF_RC"\n",
DP_RC(rc));
D_ERROR("failed to create client context: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}

Expand Down Expand Up @@ -656,7 +669,23 @@ daos_eq_create(daos_handle_t *eqh)

eqx = daos_eq2eqx(eq);

rc = crt_context_create(&eqx->eqx_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc && rc != -DER_NONEXIST) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
return rc;
}

/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&eqx->eqx_ctx);
else
rc = crt_context_create_on_iface(iface, &eqx->eqx_ctx);
} else {
rc = crt_context_create(&eqx->eqx_ctx);
}
if (rc) {
D_WARN("Failed to create CART context; using the global one, "DF_RC"\n", DP_RC(rc));
eqx->eqx_ctx = daos_eq_ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def scons():
denv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD'])
libraries = ['daos_common_pmem', 'gurt', 'cart', 'vos_srv']
libraries += ['bio', 'dl', 'uuid', 'pthread', 'abt']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal', 'numa']

denv.require('argobots', 'protobufc', 'pmdk', 'isal')

Expand Down
6 changes: 6 additions & 0 deletions src/include/daos/mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <daos/pool.h>
#include "svc.pb-c.h"

extern bool d_dynamic_ctx_g;

int dc_mgmt_init(void);

void dc_mgmt_fini(void);
Expand All @@ -41,6 +43,8 @@ struct dc_mgmt_sys_info {
d_rank_list_t *ms_ranks;
char system_name[DAOS_SYS_INFO_STRING_MAX + 1];
uint32_t provider_idx; /* Provider index (if more than one available) */
daos_size_t numa_entries_nr;
daos_size_t *numa_iface_idx_rr;
};

/** Client system handle */
Expand Down Expand Up @@ -78,5 +82,7 @@ int dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info
void dc_put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp);
int dc_mgmt_cache_attach_info(const char *name);
void dc_mgmt_drop_attach_info(void);
int
dc_mgmt_get_iface(char *iface);
int dc_mgmt_tm_register(const char *sys, const char *jobid, key_t shm_key, uid_t *owner_uid);
#endif
136 changes: 126 additions & 10 deletions src/mgmt/cli_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

#define D_LOGFAC DD_FAC(mgmt)

#include <daos/mgmt.h>

#include <daos/agent.h>
#include <daos/drpc_modules.h>
#include <daos/event.h>
#include <daos/job.h>
#include <daos/mgmt.h>
#include <daos/pool.h>
#include <daos/security.h>
#include "svc.pb-c.h"
#include "rpc.h"
#include <errno.h>
#include <numa.h>
#include <stdlib.h>
#include <sys/ipc.h>

Expand All @@ -31,6 +31,7 @@ char agent_sys_name[DAOS_SYS_NAME_MAX + 1] = DAOS_DEFAULT_SYS_NAME;
static struct dc_mgmt_sys_info info_g;
static Mgmt__GetAttachInfoResp *resp_g;

bool d_dynamic_ctx_g;
int dc_mgmt_proto_version;

int
Expand Down Expand Up @@ -241,6 +242,7 @@ put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp)
if (resp != NULL)
free_get_attach_info_resp(resp);
d_rank_list_free(info->ms_ranks);
D_FREE(info->numa_iface_idx_rr);
}

void
Expand Down Expand Up @@ -413,9 +415,23 @@ dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info *in
int
dc_mgmt_cache_attach_info(const char *name)
{
int rc;

if (name != NULL && strcmp(name, agent_sys_name) != 0)
return -DER_INVAL;
return get_attach_info(name, true, &info_g, &resp_g);
rc = get_attach_info(name, true, &info_g, &resp_g);
if (rc)
return rc;

info_g.numa_entries_nr = resp_g->n_numa_fabric_interfaces;
D_ALLOC_ARRAY(info_g.numa_iface_idx_rr, info_g.numa_entries_nr);
if (info_g.numa_iface_idx_rr == NULL)
D_GOTO(err_rank_list, rc = -DER_NOMEM);
return 0;

err_rank_list:
put_attach_info(&info_g, resp_g);
return rc;
}

static void
Expand Down Expand Up @@ -625,14 +641,56 @@ dc_mgmt_net_cfg(const char *name, crt_init_options_t *crt_info)
D_STRNDUP(crt_info->cio_provider, info->provider, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_provider)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);

D_INFO("Network interface: %s, Domain: %s\n", info->interface, info->domain);
d_getenv_bool("D_DYNAMIC_CTX", &d_dynamic_ctx_g);
if (d_dynamic_ctx_g) {
int i;
daos_size_t size = 0;

for (i = 0; i < resp->n_numa_fabric_interfaces; i++)
size += resp_g->numa_fabric_interfaces[i]->n_ifaces *
(DAOS_SYS_INFO_STRING_MAX + 1);

D_ALLOC(crt_info->cio_interface, size);
if (crt_info->cio_interface == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_ALLOC(crt_info->cio_domain, size);
if (crt_info->cio_domain == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);

for (i = 0; i < resp->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int j;

for (j = 0; j < numa_ifaces->n_ifaces; j++) {
if (i != 0 || j != 0) {
strcat(crt_info->cio_interface, ",");
strcat(crt_info->cio_domain, ",");
}
strncat(crt_info->cio_interface, numa_ifaces->ifaces[j]->interface,
DAOS_SYS_INFO_STRING_MAX);
strncat(crt_info->cio_domain, numa_ifaces->ifaces[j]->domain,
DAOS_SYS_INFO_STRING_MAX);
}
/*
* If we have multiple interfaces per numa node, we want to randomize the
* first interface selected in case we have multiple processes running
* there. So initialize the index array at that interface to -1 to know that
* this is the first selection later.
*/
if (numa_ifaces->n_ifaces > 1)
info_g.numa_iface_idx_rr[i] = -1;
}
} else {
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);
}
D_INFO("Network interface: %s, Domain: %s, Provider: %s\n", crt_info->cio_interface,
crt_info->cio_domain, crt_info->cio_provider);
D_DEBUG(DB_MGMT,
"CaRT initialization with:\n"
"\tD_PROVIDER: %s, CRT_TIMEOUT: %d, CRT_SECONDARY_PROVIDER: %s\n",
Expand Down Expand Up @@ -667,6 +725,64 @@ int dc_mgmt_net_cfg_check(const char *name)
return 0;
}

int
dc_mgmt_get_iface(char *iface)
{
int cpu;
int numa;
int i;

cpu = sched_getcpu();
if (cpu < 0) {
D_ERROR("sched_getcpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

numa = numa_node_of_cpu(cpu);
if (numa < 0) {
D_ERROR("numa_node_of_cpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

if (resp_g->n_numa_fabric_interfaces <= 0) {
D_ERROR("No fabric interfaces initialized.\n");
return -DER_INVAL;
}

for (i = 0; i < resp_g->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int idx;

if (numa_ifaces->numa_node != numa)
continue;

/*
* Randomize the first interface used to avoid multiple processes starting on the
* first interface (if there is more than 1).
*/
if (info_g.numa_iface_idx_rr[i] == -1) {
d_srand(getpid());
info_g.numa_iface_idx_rr[i] = d_rand() % numa_ifaces->n_ifaces;
}
idx = info_g.numa_iface_idx_rr[i] % numa_ifaces->n_ifaces;
D_ASSERT(numa_ifaces->ifaces[idx]->numa_node == numa);
info_g.numa_iface_idx_rr[i]++;

if (copy_str(iface, numa_ifaces->ifaces[idx]->interface) != 0) {
D_ERROR("Interface string too long.\n");
return -DER_INVAL;
}
D_DEBUG(DB_MGMT, "Numa: %d, Interface Selected: IDX: %d, Name = %s\n", numa, idx,
iface);
break;
}
if (i == resp_g->n_numa_fabric_interfaces) {
D_DEBUG(DB_MGMT, "No iface on numa %d\n", numa);
return -DER_NONEXIST;
}
return 0;
}

static int send_monitor_request(struct dc_pool *pool, int request_type)
{
struct drpc *ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/rdb/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def scons():
# rdbt client
rdbt = tenv.d_program('rdbt', ['rdbt.c', 'rpc.c'] + libdaos_tgts,
LIBS=['daos_common_pmem', 'cart', 'gurt', 'uuid', 'isal', 'protobuf-c',
'pthread'])
'pthread', 'numa'])
tenv.Install('$PREFIX/bin', rdbt)


Expand Down
2 changes: 1 addition & 1 deletion src/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_tests(env):
daos_perf = denv.d_program('daos_perf', ['daos_perf.c', perf_common], LIBS=libs_client)
denv.Install('$PREFIX/bin/', daos_perf)

libs_server += ['vos', 'bio', 'abt']
libs_server += ['vos', 'bio', 'abt', 'numa']
vos_engine = denv.StaticObject(['vos_engine.c'])

if denv["STACK_MMAP"] == 1:
Expand Down
8 changes: 7 additions & 1 deletion src/tests/ftest/dfuse/pil4dfs_fio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cpu_utils import CpuInfo
from dfuse_utils import get_dfuse, start_dfuse
from fio_utils import FioCommand
from general_utils import bytes_to_human, percent_change
from general_utils import bytes_to_human, get_log_file, percent_change


class Pil4dfsFio(TestWithServers):
Expand Down Expand Up @@ -115,6 +115,9 @@ def _run_fio_pil4dfs(self, ioengine):
"global", "cpus_allowed", self.fio_cpus_allowed,
f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}")
fio_cmd.env['LD_PRELOAD'] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so')
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down Expand Up @@ -154,6 +157,9 @@ def _run_fio_dfs(self):
fio_cmd.update(
"job", "pool", container.pool.uuid, f"fio --name=job --pool={container.pool.uuid}")
fio_cmd.update("job", "cont", container.uuid, f"fio --name=job --cont={container.uuid}")
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down
2 changes: 2 additions & 0 deletions src/tests/ftest/dfuse/pil4dfs_fio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ server_config:
fabric_iface: ib0
fabric_iface_port: 31317
log_file: daos_server0.log
log_mask: INFO
storage: auto
1:
pinned_numa_node: 1
fabric_iface: ib1
fabric_iface_port: 31417
log_file: daos_server1.log
log_mask: INFO
storage: auto

pool:
Expand Down

0 comments on commit 9662e98

Please sign in to comment.