Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-15800 client: create cart context on specific interface #14804

Merged
merged 6 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cart/README.env
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ This file lists the environment variables used in CaRT.

. CRT_CTX_NUM
If set, specifies the limit of number of allowed CaRT contexts to be created.
Valid range is [1, 64], with default being 64 if unset.
Valid range is [1, 128], with default being 128 if unset.

. D_FI_CONFIG
Specifies the fault injection configuration file. If this variable is not set
Expand Down
2 changes: 1 addition & 1 deletion src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#define CRT_CONTEXT_NULL (NULL)

#ifndef CRT_SRV_CONTEXT_NUM
#define CRT_SRV_CONTEXT_NUM (64) /* Maximum number of contexts */
#define CRT_SRV_CONTEXT_NUM (128) /* Maximum number of contexts */
mchaarawi marked this conversation as resolved.
Show resolved Hide resolved
#endif


Expand Down
2 changes: 1 addition & 1 deletion src/client/api/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scons():

if prereqs.client_requested():
libdaos = env.d_library('daos', libdaos_tgts, SHLIBVERSION=API_VERSION,
LIBS=['daos_common'])
LIBS=['daos_common', 'numa'])
if hasattr(env, 'InstallVersionedLib'):
env.InstallVersionedLib('$PREFIX/lib64/', libdaos, SHLIBVERSION=API_VERSION)
else:
Expand Down
39 changes: 34 additions & 5 deletions src/client/api/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,24 @@ daos_eq_lib_init(crt_init_options_t *crt_info)
D_GOTO(unlock, rc);
}

/* use a global shared context for all eq for now */
rc = crt_context_create(&daos_eq_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc && rc != -DER_NONEXIST) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&daos_eq_ctx);
else
rc = crt_context_create_on_iface(iface, &daos_eq_ctx);
} else {
rc = crt_context_create(&daos_eq_ctx);
}
if (rc != 0) {
D_ERROR("failed to create client context: "DF_RC"\n",
DP_RC(rc));
D_ERROR("failed to create client context: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}

Expand Down Expand Up @@ -656,7 +669,23 @@ daos_eq_create(daos_handle_t *eqh)

eqx = daos_eq2eqx(eq);

rc = crt_context_create(&eqx->eqx_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
daos_eq_free(&eqx->eqx_hlink);
return rc;
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
kjacque marked this conversation as resolved.
Show resolved Hide resolved
rc = crt_context_create(&eqx->eqx_ctx);
else
rc = crt_context_create_on_iface(iface, &eqx->eqx_ctx);
} else {
rc = crt_context_create(&eqx->eqx_ctx);
}
if (rc) {
D_WARN("Failed to create CART context; using the global one, "DF_RC"\n", DP_RC(rc));
eqx->eqx_ctx = daos_eq_ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def scons():
denv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD'])
libraries = ['daos_common_pmem', 'gurt', 'cart', 'vos_srv']
libraries += ['bio', 'dl', 'uuid', 'pthread', 'abt']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal', 'numa']

denv.require('argobots', 'protobufc', 'pmdk', 'isal')

Expand Down
6 changes: 6 additions & 0 deletions src/include/daos/mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <daos/pool.h>
#include "svc.pb-c.h"

extern bool d_dynamic_ctx_g;

int dc_mgmt_init(void);

void dc_mgmt_fini(void);
Expand All @@ -41,6 +43,8 @@ struct dc_mgmt_sys_info {
d_rank_list_t *ms_ranks;
char system_name[DAOS_SYS_INFO_STRING_MAX + 1];
uint32_t provider_idx; /* Provider index (if more than one available) */
daos_size_t numa_entries_nr;
daos_size_t *numa_iface_idx_rr;
};

/** Client system handle */
Expand Down Expand Up @@ -78,5 +82,7 @@ int dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info
void dc_put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp);
int dc_mgmt_cache_attach_info(const char *name);
void dc_mgmt_drop_attach_info(void);
int
dc_mgmt_get_iface(char *iface);
int dc_mgmt_tm_register(const char *sys, const char *jobid, key_t shm_key, uid_t *owner_uid);
#endif
131 changes: 121 additions & 10 deletions src/mgmt/cli_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

#define D_LOGFAC DD_FAC(mgmt)

#include <daos/mgmt.h>

#include <daos/agent.h>
#include <daos/drpc_modules.h>
#include <daos/event.h>
#include <daos/job.h>
#include <daos/mgmt.h>
#include <daos/pool.h>
#include <daos/security.h>
#include "svc.pb-c.h"
#include "rpc.h"
#include <errno.h>
#include <numa.h>
#include <stdlib.h>
#include <sys/ipc.h>

Expand All @@ -31,6 +31,7 @@ char agent_sys_name[DAOS_SYS_NAME_MAX + 1] = DAOS_DEFAULT_SYS_NAME;
static struct dc_mgmt_sys_info info_g;
static Mgmt__GetAttachInfoResp *resp_g;

bool d_dynamic_ctx_g;
int dc_mgmt_proto_version;

int
Expand Down Expand Up @@ -241,6 +242,7 @@ put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp)
if (resp != NULL)
free_get_attach_info_resp(resp);
d_rank_list_free(info->ms_ranks);
D_FREE(info->numa_iface_idx_rr);
}

void
Expand Down Expand Up @@ -413,9 +415,23 @@ dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info *in
int
dc_mgmt_cache_attach_info(const char *name)
{
int rc;

if (name != NULL && strcmp(name, agent_sys_name) != 0)
return -DER_INVAL;
return get_attach_info(name, true, &info_g, &resp_g);
rc = get_attach_info(name, true, &info_g, &resp_g);
if (rc)
return rc;

info_g.numa_entries_nr = resp_g->n_numa_fabric_interfaces;
D_ALLOC_ARRAY(info_g.numa_iface_idx_rr, info_g.numa_entries_nr);
if (info_g.numa_iface_idx_rr == NULL)
D_GOTO(err_rank_list, rc = -DER_NOMEM);
return 0;

err_rank_list:
put_attach_info(&info_g, resp_g);
return rc;
}

static void
Expand Down Expand Up @@ -625,14 +641,51 @@ dc_mgmt_net_cfg(const char *name, crt_init_options_t *crt_info)
D_STRNDUP(crt_info->cio_provider, info->provider, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_provider)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);

D_INFO("Network interface: %s, Domain: %s\n", info->interface, info->domain);
d_getenv_bool("D_DYNAMIC_CTX", &d_dynamic_ctx_g);
if (d_dynamic_ctx_g) {
int i;

D_ALLOC(crt_info->cio_interface,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_interface == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_ALLOC(crt_info->cio_domain,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_domain == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);

for (i = 0; i < resp->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int j;

for (j = 0; j < numa_ifaces->n_ifaces; j++) {
if (i != 0 || j != 0) {
strcat(crt_info->cio_interface, ",");
frostedcmos marked this conversation as resolved.
Show resolved Hide resolved
strcat(crt_info->cio_domain, ",");
}
strcat(crt_info->cio_interface, numa_ifaces->ifaces[j]->interface);
strcat(crt_info->cio_domain, numa_ifaces->ifaces[j]->domain);
mchaarawi marked this conversation as resolved.
Show resolved Hide resolved
}
/*
* If we have multiple interfaces per numa node, we want to randomize the
* first interface selected in case we have multiple processes running
* there. So initialize the index array at that interface to -1 to know that
* this is the first selection later.
*/
if (numa_ifaces->n_ifaces)
frostedcmos marked this conversation as resolved.
Show resolved Hide resolved
info_g.numa_iface_idx_rr[i] = -1;
}
} else {
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);
}
D_INFO("Network interface: %s, Domain: %s, Provider: %s\n", crt_info->cio_interface,
crt_info->cio_domain, crt_info->cio_provider);
D_DEBUG(DB_MGMT,
"CaRT initialization with:\n"
"\tD_PROVIDER: %s, CRT_TIMEOUT: %d, CRT_SECONDARY_PROVIDER: %s\n",
Expand Down Expand Up @@ -667,6 +720,64 @@ int dc_mgmt_net_cfg_check(const char *name)
return 0;
}

int
dc_mgmt_get_iface(char *iface)
{
int cpu;
int numa;
int i;

cpu = sched_getcpu();
if (cpu < 0) {
D_ERROR("sched_getcpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

numa = numa_node_of_cpu(cpu);
if (numa < 0) {
D_ERROR("numa_node_of_cpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

if (resp_g->n_numa_fabric_interfaces <= 0) {
D_ERROR("No fabric interfaces initialized.\n");
return -DER_INVAL;
}

for (i = 0; i < resp_g->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int idx;

if (numa_ifaces->numa_node != numa)
continue;
kjacque marked this conversation as resolved.
Show resolved Hide resolved

/*
* Randomize the first interface used to avoid multiple processes starting on the
* first interface (if there is more than 1).
*/
if (info_g.numa_iface_idx_rr[i] == -1) {
d_srand(getpid());
info_g.numa_iface_idx_rr[i] = d_rand() % numa_ifaces->n_ifaces;
}
idx = info_g.numa_iface_idx_rr[i] % numa_ifaces->n_ifaces;
D_ASSERT(numa_ifaces->ifaces[idx]->numa_node == numa);
info_g.numa_iface_idx_rr[i]++;

if (copy_str(iface, numa_ifaces->ifaces[idx]->interface) != 0) {
D_ERROR("Interface string too long.\n");
return -DER_INVAL;
}
D_DEBUG(DB_MGMT, "Numa: %d, Interface Selected: IDX: %d, Name = %s\n", numa, idx,
iface);
break;
}
if (i == resp_g->n_numa_fabric_interfaces) {
D_DEBUG(DB_MGMT, "No iface on numa %d\n", numa);
return -DER_NONEXIST;
}
return 0;
}

static int send_monitor_request(struct dc_pool *pool, int request_type)
{
struct drpc *ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/rdb/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def scons():
# rdbt client
rdbt = tenv.d_program('rdbt', ['rdbt.c', 'rpc.c'] + libdaos_tgts,
LIBS=['daos_common_pmem', 'cart', 'gurt', 'uuid', 'isal', 'protobuf-c',
'pthread'])
'pthread', 'numa'])
tenv.Install('$PREFIX/bin', rdbt)


Expand Down
2 changes: 1 addition & 1 deletion src/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_tests(env):
daos_perf = denv.d_program('daos_perf', ['daos_perf.c', perf_common], LIBS=libs_client)
denv.Install('$PREFIX/bin/', daos_perf)

libs_server += ['vos', 'bio', 'abt']
libs_server += ['vos', 'bio', 'abt', 'numa']
vos_engine = denv.StaticObject(['vos_engine.c'])

if denv["STACK_MMAP"] == 1:
Expand Down
8 changes: 7 additions & 1 deletion src/tests/ftest/dfuse/pil4dfs_fio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cpu_utils import CpuInfo
from dfuse_utils import get_dfuse, start_dfuse
from fio_utils import FioCommand
from general_utils import bytes_to_human, percent_change
from general_utils import bytes_to_human, get_log_file, percent_change


class Pil4dfsFio(TestWithServers):
Expand Down Expand Up @@ -115,6 +115,9 @@ def _run_fio_pil4dfs(self, ioengine):
"global", "cpus_allowed", self.fio_cpus_allowed,
f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}")
fio_cmd.env['LD_PRELOAD'] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so')
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down Expand Up @@ -154,6 +157,9 @@ def _run_fio_dfs(self):
fio_cmd.update(
"job", "pool", container.pool.uuid, f"fio --name=job --pool={container.pool.uuid}")
fio_cmd.update("job", "cont", container.uuid, f"fio --name=job --cont={container.uuid}")
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down
2 changes: 2 additions & 0 deletions src/tests/ftest/dfuse/pil4dfs_fio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ server_config:
fabric_iface: ib0
fabric_iface_port: 31317
log_file: daos_server0.log
log_mask: INFO
storage: auto
1:
pinned_numa_node: 1
fabric_iface: ib1
fabric_iface_port: 31417
log_file: daos_server1.log
log_mask: INFO
storage: auto

pool:
Expand Down
Loading