Skip to content

Commit

Permalink
DAOS-15476 client: Better protocol query handling (#14392)
Browse files Browse the repository at this point in the history
Pick from all possible ranks for protocol query handling

Backports the following patches, last one fixes the issue but prior two are benign and make the backport work without conflict

DAOS-13717 client: use d_rand() to generate random number (#12418)
DAOS-15120 crt: Misc d_rand-related fixes (#13738)
DAOS-15476 client: do not use rsvc for proto query rank selection (#14131)
DAOS-15540 cart: Change proto query default timeout (#14382)

Signed-off-by: Mohamad Chaarawi <[email protected]>
Signed-off-by: Wang Shilong <[email protected]>
Signed-off-by: Li Wei <[email protected]>
Signed-off-by: Jeff Olivier <[email protected]>
  • Loading branch information
jolivier23 authored May 20, 2024
1 parent 264cda4 commit 3b2748f
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 77 deletions.
25 changes: 12 additions & 13 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
*/
#include <malloc.h>
#include <sys/mman.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "crt_internal.h"

Expand Down Expand Up @@ -74,6 +73,7 @@ crt_lib_init(void)
{
int rc;
uint64_t start_rpcid;
struct timespec now;

rc = D_RWLOCK_INIT(&crt_gdata.cg_rwlock, NULL);
D_ASSERT(rc == 0);
Expand All @@ -88,7 +88,9 @@ crt_lib_init(void)
crt_gdata.cg_inited = 0;
crt_gdata.cg_primary_prov = CRT_PROV_OFI_SOCKETS;

d_srand(d_timeus_secdiff(0) + getpid());
rc = d_gettime(&now);
D_ASSERTF(rc == 0, "d_gettime: " DF_RC "\n", DP_RC(rc));
d_srand(now.tv_sec * 1000 * 1000 * 1000 + now.tv_nsec + getpid());
start_rpcid = ((uint64_t)d_rand()) << 32;

crt_gdata.cg_rpcid = start_rpcid;
Expand Down Expand Up @@ -611,12 +613,14 @@ crt_protocol_info_free(struct crt_protocol_info *protocol_info)
int
crt_init_opt(crt_group_id_t grpid, uint32_t flags, crt_init_options_t *opt)
{
char *provider, *provider_env;
char *interface, *interface_env;
char *domain, *domain_env;
char *auth_key, *auth_key_env;
struct timeval now;
unsigned int seed;
char *provider;
char *provider_env = NULL;
char *interface;
char *interface_env = NULL;
char *domain;
char *domain_env = NULL;
char *auth_key;
char *auth_key_env = NULL;
char *path;
bool server;
int rc = 0;
Expand Down Expand Up @@ -684,11 +688,6 @@ crt_init_opt(crt_group_id_t grpid, uint32_t flags, crt_init_options_t *opt)

D_RWLOCK_WRLOCK(&crt_gdata.cg_rwlock);
if (crt_gdata.cg_inited == 0) {
/* feed a seed for pseudo-random number generator */
gettimeofday(&now, NULL);
seed = (unsigned int)(now.tv_sec * 1000000 + now.tv_usec);
d_srand(seed);

crt_gdata.cg_server = server;
crt_gdata.cg_auto_swim_disable =
(flags & CRT_FLAG_BIT_AUTO_SWIM_DISABLE) ? 1 : 0;
Expand Down
30 changes: 23 additions & 7 deletions src/cart/crt_register.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2022 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -572,13 +572,14 @@ proto_query_cb(const struct crt_cb_info *cb_info)

int
crt_proto_query_int(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver, int count,
crt_proto_query_cb_t cb, void *arg, crt_context_t ctx)
uint32_t timeout, crt_proto_query_cb_t cb, void *arg, crt_context_t ctx)
{
crt_rpc_t *rpc_req = NULL;
crt_context_t crt_ctx;
struct crt_proto_query_in *rpc_req_input;
struct proto_query_t *proto_query = NULL;
uint32_t *tmp_array = NULL;
uint32_t default_timeout;
int rc = DER_SUCCESS;

if (ver == NULL) {
Expand Down Expand Up @@ -629,6 +630,21 @@ crt_proto_query_int(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver
proto_query->pq_user_arg = arg;
proto_query->pq_coq->coq_base = base_opc;

if (timeout != 0) {
/** The global timeout may be overwritten by the per context timeout
* so let's use the API to get the actual setting.
*/
rc = crt_req_get_timeout(rpc_req, &default_timeout);
/** Should only fail if invalid parameter */
D_ASSERT(rc == 0);

if (timeout < default_timeout) {
rc = crt_req_set_timeout(rpc_req, timeout);
/** Should only fail if invalid parameter */
D_ASSERT(rc == 0);
}
}

rc = crt_req_send(rpc_req, proto_query_cb, proto_query);
if (rc != 0)
D_ERROR("crt_req_send() failed: "DF_RC"\n", DP_RC(rc));
Expand All @@ -650,17 +666,17 @@ crt_proto_query_int(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver
}

int
crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc,
uint32_t *ver, int count, crt_proto_query_cb_t cb, void *arg)
crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver, int count,
uint32_t timeout, crt_proto_query_cb_t cb, void *arg)
{
return crt_proto_query_int(tgt_ep, base_opc, ver, count, cb, arg, NULL);
return crt_proto_query_int(tgt_ep, base_opc, ver, count, timeout, cb, arg, NULL);
}

int
crt_proto_query_with_ctx(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver, int count,
crt_proto_query_cb_t cb, void *arg, crt_context_t ctx)
uint32_t timeout, crt_proto_query_cb_t cb, void *arg, crt_context_t ctx)
{
return crt_proto_query_int(tgt_ep, base_opc, ver, count, cb, arg, ctx);
return crt_proto_query_int(tgt_ep, base_opc, ver, count, timeout, cb, arg, ctx);
}

/* local operation, query if base_opc with version number ver is registered. */
Expand Down
8 changes: 3 additions & 5 deletions src/cart/crt_rpc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -346,8 +346,7 @@ crt_register_proto_fi(crt_endpoint_t *ep)
if (rc != 0)
return -DER_MISC;

rc = crt_proto_query(ep, cpf.cpf_base, &cpf.cpf_ver,
1, crt_pfi_cb, &pfi);
rc = crt_proto_query(ep, cpf.cpf_base, &cpf.cpf_ver, 1, 0, crt_pfi_cb, &pfi);
if (rc != -DER_SUCCESS)
D_GOTO(out, rc);

Expand Down Expand Up @@ -386,8 +385,7 @@ crt_register_proto_ctl(crt_endpoint_t *ep)
if (rc != 0)
return -DER_MISC;

rc = crt_proto_query(ep, cpf.cpf_base, &cpf.cpf_ver,
1, crt_pfi_cb, &pfi);
rc = crt_proto_query(ep, cpf.cpf_base, &cpf.cpf_ver, 1, 0, crt_pfi_cb, &pfi);
if (rc != -DER_SUCCESS)
D_GOTO(out, rc);

Expand Down
66 changes: 27 additions & 39 deletions src/client/api/rpc.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
#define D_LOGFAC DD_FAC(client)

#include <daos/rpc.h>
#include <daos/event.h>
#include <daos/rsvc.h>
#include <daos/mgmt.h>

static void
Expand Down Expand Up @@ -96,14 +95,15 @@ daos_rpc_send_wait(crt_rpc_t *rpc)
}

struct rpc_proto {
struct rsvc_client cli;
crt_endpoint_t ep;
int version;
int rc;
bool completed;
crt_opcode_t base_opc;
uint32_t *ver_array;
uint32_t array_size;
int nr_ranks;
crt_endpoint_t ep;
int version;
int rc;
bool completed;
crt_opcode_t base_opc;
uint32_t *ver_array;
uint32_t array_size;
uint32_t timeout;
};

static void
Expand All @@ -113,16 +113,11 @@ query_cb(struct crt_proto_query_cb_info *cb_info)
int rc;

if (daos_rpc_retryable_rc(cb_info->pq_rc)) {
rc = rsvc_client_choose(&rproto->cli, &rproto->ep);
if (rc) {
D_ERROR("rsvc_client_choose() failed: "DF_RC"\n", DP_RC(rc));
rproto->rc = rc;
rproto->completed = true;
}

rc = crt_proto_query_with_ctx(&rproto->ep, rproto->base_opc,
rproto->ver_array, rproto->array_size,
query_cb, rproto, daos_get_crt_ctx());
rproto->ep.ep_rank = (rproto->ep.ep_rank + 1) % rproto->nr_ranks;
rproto->timeout += 3;
rc = crt_proto_query_with_ctx(&rproto->ep, rproto->base_opc, rproto->ver_array,
rproto->array_size, rproto->timeout, query_cb, rproto,
daos_get_crt_ctx());
if (rc) {
D_ERROR("crt_proto_query_with_ctx() failed: "DF_RC"\n", DP_RC(rc));
rproto->rc = rc;
Expand All @@ -141,8 +136,7 @@ daos_rpc_proto_query(crt_opcode_t base_opc, uint32_t *ver_array, int count, int
struct dc_mgmt_sys *sys;
struct rpc_proto *rproto = NULL;
crt_context_t ctx = daos_get_crt_ctx();
int rc;
int num_ranks;
int rc;
int i;

rc = dc_mgmt_sys_attach(NULL, &sys);
Expand All @@ -155,39 +149,35 @@ daos_rpc_proto_query(crt_opcode_t base_opc, uint32_t *ver_array, int count, int
if (rproto == NULL)
D_GOTO(out_detach, rc = -DER_NOMEM);

rc = rsvc_client_init(&rproto->cli, sys->sy_info.ms_ranks);
if (rc) {
D_ERROR("rsvc_client_init() failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out_free, rc);
}

num_ranks = dc_mgmt_net_get_num_srv_ranks();
rproto->ep.ep_rank = rand() % num_ranks;
/** select a random rank to issue the proto query rpc to */
rproto->nr_ranks = dc_mgmt_net_get_num_srv_ranks();
rproto->ep.ep_rank = d_rand() % rproto->nr_ranks;
rproto->ep.ep_tag = 0;
rproto->ver_array = ver_array;
rproto->array_size = count;
rproto->ep.ep_grp = sys->sy_group;
rproto->ep.ep_tag = 0;
rproto->ep.ep_grp = sys->sy_group;
rproto->base_opc = base_opc;
rproto->timeout = 3;

rc = crt_proto_query_with_ctx(&rproto->ep, base_opc,
ver_array, count, query_cb, rproto, ctx);
rc = crt_proto_query_with_ctx(&rproto->ep, base_opc, ver_array, count, rproto->timeout,
query_cb, rproto, ctx);
if (rc) {
D_ERROR("crt_proto_query_with_ctx() failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out_rsvc, rc);
D_GOTO(out_free, rc);
}

while (!rproto->completed) {
rc = crt_progress(ctx, 0);
if (rc && rc != -DER_TIMEDOUT) {
D_ERROR("failed to progress CART context: %d\n", rc);
D_GOTO(out_rsvc, rc);
D_GOTO(out_free, rc);
}
}

if (rproto->rc != -DER_SUCCESS) {
rc = rproto->rc;
D_ERROR("crt_proto_query()failed: "DF_RC"\n", DP_RC(rc));
D_GOTO(out_rsvc, rc);
D_GOTO(out_free, rc);
}
rc = 0;

Expand All @@ -201,8 +191,6 @@ daos_rpc_proto_query(crt_opcode_t base_opc, uint32_t *ver_array, int count, int
} else {
*ret_ver = rproto->version;
}
out_rsvc:
rsvc_client_fini(&rproto->cli);
out_free:
D_FREE(rproto);
out_detach:
Expand Down
4 changes: 2 additions & 2 deletions src/common/rsvc.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ rsvc_client_choose(struct rsvc_client *client, crt_endpoint_t *ep)
chosen = client->sc_leader_index;
} else {
if (client->sc_next < 0)
client->sc_next = d_randn(client->sc_ranks->rl_nr);
client->sc_next = d_rand() % client->sc_ranks->rl_nr;
chosen = client->sc_next;
/* The hintless search is a round robin of all replicas. */
client->sc_next++;
Expand Down Expand Up @@ -148,7 +148,7 @@ rsvc_client_process_error(struct rsvc_client *client, int rc,
* search.
*/
D_DEBUG(DB_MD, "give up leader rank %u\n", ep->ep_rank);
client->sc_next = d_randn(client->sc_ranks->rl_nr);
client->sc_next = d_rand() % client->sc_ranks->rl_nr;
if (client->sc_next == leader_index) {
client->sc_next++;
client->sc_next %= client->sc_ranks->rl_nr;
Expand Down
2 changes: 1 addition & 1 deletion src/gurt/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ d_rank_list_shuffle(d_rank_list_t *rank_list)
return;

for (i = 0; i < rank_list->rl_nr; i++) {
j = rand() % rank_list->rl_nr;
j = d_rand() % rank_list->rl_nr;
tmp = rank_list->rl_ranks[i];
rank_list->rl_ranks[i] = rank_list->rl_ranks[j];
rank_list->rl_ranks[j] = tmp;
Expand Down
10 changes: 7 additions & 3 deletions src/include/cart/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,8 @@ crt_proto_register(struct crt_proto_format *cpf);
* \param[in] base_opc the base opcode for the protocol
* \param[in] ver array of protocol version
* \param[in] count number of elements in ver
* \param[in] timeout Timeout in seconds, ignored if 0 or greater than
* default timeout
* \param[in] cb completion callback. crt_proto_query() internally
* sends an RPC to \a tgt_ep. \a cb will be called
* upon completion of that RPC. The highest protocol
Expand All @@ -1888,8 +1890,8 @@ crt_proto_register(struct crt_proto_format *cpf);
* failure.
*/
int
crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc,
uint32_t *ver, int count, crt_proto_query_cb_t cb, void *arg);
crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver, int count,
uint32_t timeout, crt_proto_query_cb_t cb, void *arg);

/**
* query tgt_ep if it has registered base_opc with version using a user provided cart context.
Expand All @@ -1898,6 +1900,8 @@ crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc,
* \param[in] base_opc the base opcode for the protocol
* \param[in] ver array of protocol version
* \param[in] count number of elements in ver
* \param[in] timeout Timeout in seconds, ignored if 0 or greater than
* default timeout
* \param[in] cb completion callback. crt_proto_query() internally
* sends an RPC to \a tgt_ep. \a cb will be called
* upon completion of that RPC. The highest protocol
Expand All @@ -1912,7 +1916,7 @@ crt_proto_query(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc,
*/
int
crt_proto_query_with_ctx(crt_endpoint_t *tgt_ep, crt_opcode_t base_opc, uint32_t *ver, int count,
crt_proto_query_cb_t cb, void *arg, crt_context_t ctx);
uint32_t timeout, crt_proto_query_cb_t cb, void *arg, crt_context_t ctx);
/**
* Set self rank.
*
Expand Down
1 change: 0 additions & 1 deletion src/include/gurt/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ extern "C" {

void d_srand(long int);
long int d_rand(void);
long int d_randn(long int n);

/* memory allocating macros */
void d_free(void *);
Expand Down
4 changes: 1 addition & 3 deletions src/pool/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -1827,9 +1827,7 @@ choose_map_refresh_rank(struct map_refresh_arg *arg)

if (arg->mra_i == -1) {
/* Let i be a random integer in [0, n). */
i = ((double)rand() / RAND_MAX) * n;
if (i == n)
i = 0;
i = d_rand() % n;
} else {
/* Continue the round robin. */
i = arg->mra_i;
Expand Down
10 changes: 7 additions & 3 deletions src/tests/ftest/cart/test_proto_client.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2018-2022 Intel Corporation.
* (C) Copyright 2018-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -65,6 +65,7 @@ test_run()
uint32_t s_high_ver = 0xFFFFFFFF;
uint32_t c_high_ver = test.tg_num_proto - 1;
int rc;
uint32_t timeout;

fprintf(stderr, "local group: %s remote group: %s\n",
test.tg_local_group_name, test.tg_remote_group_name);
Expand Down Expand Up @@ -119,8 +120,11 @@ test_run()
server_ep.ep_rank = 0;

DBG_PRINT("proto query\n");
rc = crt_proto_query(&server_ep, OPC_MY_PROTO, my_ver_array, 7,
query_cb, &s_high_ver);
timeout = 1;
do {
rc = crt_proto_query(&server_ep, OPC_MY_PROTO, my_ver_array, 7, timeout++, query_cb,
&s_high_ver);
} while (rc == -DER_TIMEDOUT);
D_ASSERT(rc == 0);

while (s_high_ver == 0xFFFFFFFF)
Expand Down
Loading

0 comments on commit 3b2748f

Please sign in to comment.