diff --git a/debian/changelog b/debian/changelog index 5448a3d686b..bddf3f36d0b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +daos (2.7.100-5) unstable; urgency=medium + [ Michael MacDonald ] + * Add libdaos_self_test.so to client package + + -- Michael MacDonald Thu, 15 Aug 2024 12:00:00 -0500 + daos (2.7.100-4) unstable; urgency=medium [ Jerome Soumagne ] * Bump mercury version to 2.4.0rc4 diff --git a/debian/daos-client.install b/debian/daos-client.install index e9a5b2b5a3d..f65ea638503 100644 --- a/debian/daos-client.install +++ b/debian/daos-client.install @@ -12,6 +12,7 @@ usr/lib64/libioil.so usr/lib64/libpil4dfs.so usr/lib64/libdaos.so.* usr/lib64/libdaos_cmd_hdlrs.so +usr/lib64/libdaos_self_test.so usr/lib64/python3.8/site-packages/pydaos/*.py usr/lib64/python3.8/site-packages/pydaos/pydaos_shim.so usr/lib64/python3.8/site-packages/pydaos/raw/*.py diff --git a/docs/user/filesystem.md b/docs/user/filesystem.md index d955b79c37f..cf9ca813dc2 100644 --- a/docs/user/filesystem.md +++ b/docs/user/filesystem.md @@ -719,7 +719,7 @@ These are two command line options to control the DFuse process itself. | **Command line option** | **Description** | | ----------------------- | ------------------------- | | --disable-caching | Disables all caching | -| --disable-wb-caching | Disables write-back cache | +| --disable-wb-cache | Disables write-back cache | These will affect all containers accessed via DFuse, regardless of any container attributes. diff --git a/src/bio/bio_internal.h b/src/bio/bio_internal.h index 302c73f6411..6f17054846b 100644 --- a/src/bio/bio_internal.h +++ b/src/bio/bio_internal.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2018-2023 Intel Corporation. + * (C) Copyright 2018-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -359,9 +359,9 @@ struct bio_blobstore { * layer, teardown procedure needs be postponed. */ int bb_holdings; - /* Flags indicating blobstore load/unload is in-progress */ - unsigned bb_loading:1, - bb_unloading:1; + unsigned bb_loading:1, /* Blobstore is loading */ + bb_unloading:1, /* Blobstore is unloading */ + bb_faulty_done:1; /* Faulty reaction is done */ }; /* Per-xstream blobstore */ @@ -650,6 +650,7 @@ uint64_t default_wal_sz(uint64_t meta_sz); /* bio_recovery.c */ int bio_bs_state_transit(struct bio_blobstore *bbs); int bio_bs_state_set(struct bio_blobstore *bbs, enum bio_bs_state new_state); +void trigger_faulty_reaction(struct bio_blobstore *bbs); /* bio_device.c */ int fill_in_traddr(struct bio_dev_info *b_info, char *dev_name); diff --git a/src/bio/bio_monitor.c b/src/bio/bio_monitor.c index 34b0cc8b992..8330071127b 100644 --- a/src/bio/bio_monitor.c +++ b/src/bio/bio_monitor.c @@ -728,17 +728,54 @@ is_bbs_faulty(struct bio_blobstore *bbs) void auto_faulty_detect(struct bio_blobstore *bbs) { - int rc; + struct smd_dev_info *dev_info; + int rc; + + /* The in-memory device is already in FAULTY state */ + if (bbs->bb_state == BIO_BS_STATE_FAULTY) + return; - if (bbs->bb_state != BIO_BS_STATE_NORMAL) + /* To make things simpler, don't detect faulty in SETUP phase */ + if (bbs->bb_state == BIO_BS_STATE_SETUP) return; if (!is_bbs_faulty(bbs)) return; - rc = bio_bs_state_set(bbs, BIO_BS_STATE_FAULTY); - if (rc) - D_ERROR("Failed to set FAULTY state. "DF_RC"\n", DP_RC(rc)); + /* + * The device might have been unplugged before marked as FAULTY, and the bbs is + * already in teardown. + */ + if (bbs->bb_state != BIO_BS_STATE_NORMAL) { + /* Faulty reaction is already successfully performed */ + if (bbs->bb_faulty_done) + return; + + rc = smd_dev_get_by_id(bbs->bb_dev->bb_uuid, &dev_info); + if (rc) { + DL_ERROR(rc, "Get device info "DF_UUID" failed.", + DP_UUID(bbs->bb_dev->bb_uuid)); + return; + } + + /* The device is already marked as FAULTY */ + if (dev_info->sdi_state == SMD_DEV_FAULTY) { + smd_dev_free_info(dev_info); + trigger_faulty_reaction(bbs); + return; + } + smd_dev_free_info(dev_info); + + rc = smd_dev_set_state(bbs->bb_dev->bb_uuid, SMD_DEV_FAULTY); + if (rc) + DL_ERROR(rc, "Set device state failed."); + else + trigger_faulty_reaction(bbs); + } else { + rc = bio_bs_state_set(bbs, BIO_BS_STATE_FAULTY); + if (rc) + DL_ERROR(rc, "Failed to set FAULTY state."); + } if (rc == 0) ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, RAS_SEV_NOTICE, NULL, NULL, diff --git a/src/bio/bio_recovery.c b/src/bio/bio_recovery.c index bc779046b6c..1847fd86c6e 100644 --- a/src/bio/bio_recovery.c +++ b/src/bio/bio_recovery.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2018-2023 Intel Corporation. + * (C) Copyright 2018-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -53,10 +53,19 @@ on_faulty(struct bio_blobstore *bbs) rc = ract_ops->faulty_reaction(tgt_ids, tgt_cnt); if (rc < 0) D_ERROR("Faulty reaction failed. "DF_RC"\n", DP_RC(rc)); + else if (rc == 0) + bbs->bb_faulty_done = 1; return rc; } +void +trigger_faulty_reaction(struct bio_blobstore *bbs) +{ + D_ASSERT(!bbs->bb_faulty_done); + on_faulty(bbs); +} + static void teardown_xs_bs(void *arg) { @@ -460,9 +469,10 @@ bio_bs_state_set(struct bio_blobstore *bbs, enum bio_bs_state new_state) } int -bio_xsctxt_health_check(struct bio_xs_context *xs_ctxt) +bio_xsctxt_health_check(struct bio_xs_context *xs_ctxt, bool log_err, bool update) { struct bio_xs_blobstore *bxb; + struct media_error_msg *mem; enum smd_dev_type st; /* sys xstream in pmem mode doesn't have NVMe context */ @@ -475,8 +485,21 @@ bio_xsctxt_health_check(struct bio_xs_context *xs_ctxt) if (!bxb || !bxb->bxb_blobstore) continue; - if (bxb->bxb_blobstore->bb_state != BIO_BS_STATE_NORMAL) + if (bxb->bxb_blobstore->bb_state != BIO_BS_STATE_NORMAL) { + if (log_err && bxb->bxb_blobstore->bb_state != BIO_BS_STATE_SETUP) { + D_ALLOC_PTR(mem); + if (mem == NULL) { + D_ERROR("Failed to allocate media error msg.\n"); + return -DER_NVME_IO; + } + + mem->mem_err_type = update ? MET_WRITE : MET_READ; + mem->mem_bs = bxb->bxb_blobstore; + mem->mem_tgt_id = xs_ctxt->bxc_tgt_id; + spdk_thread_send_msg(owner_thread(mem->mem_bs), bio_media_error, mem); + } return -DER_NVME_IO; + } } return 0; @@ -492,7 +515,7 @@ is_reint_ready(struct bio_blobstore *bbs) xs_ctxt = bbs->bb_xs_ctxts[i]; D_ASSERT(xs_ctxt != NULL); - if (bio_xsctxt_health_check(xs_ctxt)) + if (bio_xsctxt_health_check(xs_ctxt, false, false)) return false; } return true; diff --git a/src/bio/bio_xstream.c b/src/bio/bio_xstream.c index 72f71dabd8b..4bba7359a7e 100644 --- a/src/bio/bio_xstream.c +++ b/src/bio/bio_xstream.c @@ -1705,6 +1705,18 @@ bio_nvme_ctl(unsigned int cmd, void *arg) return rc; } +static inline void +reset_media_errors(struct bio_blobstore *bbs) +{ + struct nvme_stats *dev_stats = &bbs->bb_dev_health.bdh_health_state; + + dev_stats->bio_read_errs = 0; + dev_stats->bio_write_errs = 0; + dev_stats->bio_unmap_errs = 0; + dev_stats->checksum_errs = 0; + bbs->bb_faulty_done = 0; +} + void setup_bio_bdev(void *arg) { @@ -1736,6 +1748,7 @@ setup_bio_bdev(void *arg) goto out; } + reset_media_errors(bbs); rc = bio_bs_state_set(bbs, BIO_BS_STATE_SETUP); D_ASSERT(rc == 0); out: diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index b78a3df8afb..d740211f714 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -965,16 +965,20 @@ uri_lookup_cb(const struct crt_cb_info *cb_info) retry: if (rc != 0) { - if (chained_rpc_priv->crp_ul_retry++ < MAX_URI_LOOKUP_RETRIES) { - rc = crt_issue_uri_lookup_retry(lookup_rpc->cr_ctx, - grp_priv, - ul_in->ul_rank, - ul_in->ul_tag, - chained_rpc_priv); - D_GOTO(out, rc); + /* PROTO_QUERY will be retried by the caller, no need to retry URI lookups */ + if (chained_rpc_priv->crp_pub.cr_opc != CRT_OPC_PROTO_QUERY) { + if (chained_rpc_priv->crp_ul_retry++ < MAX_URI_LOOKUP_RETRIES) { + rc = crt_issue_uri_lookup_retry(lookup_rpc->cr_ctx, grp_priv, + ul_in->ul_rank, ul_in->ul_tag, + chained_rpc_priv); + D_GOTO(out, rc); + } else { + D_ERROR("URI lookups exceeded %d retries\n", + chained_rpc_priv->crp_ul_retry); + } } else { - D_ERROR("URI lookups exceeded %d retries\n", - chained_rpc_priv->crp_ul_retry); + DL_INFO(rc, "URI_LOOKUP for (%d:%d) failed during PROTO_QUERY", + ul_in->ul_rank, ul_in->ul_tag); } } diff --git a/src/client/dfuse/pil4dfs/int_dfs.c b/src/client/dfuse/pil4dfs/int_dfs.c index ad748852888..eaecdfabc9a 100644 --- a/src/client/dfuse/pil4dfs/int_dfs.c +++ b/src/client/dfuse/pil4dfs/int_dfs.c @@ -940,13 +940,6 @@ child_hdlr(void) DL_WARN(rc, "daos_eq_lib_init() failed in child process"); daos_dti_reset(); td_eqh = main_eqh = DAOS_HDL_INVAL; - if (d_eq_count_max > 0) { - rc = daos_eq_create(&td_eqh); - if (rc) - DL_WARN(rc, "daos_eq_create() failed"); - else - main_eqh = td_eqh; - } context_reset = true; } diff --git a/src/common/lru.c b/src/common/lru.c index bcd3f582a33..bb270500ab7 100644 --- a/src/common/lru.c +++ b/src/common/lru.c @@ -216,7 +216,12 @@ daos_lru_ref_hold(struct daos_lru_cache *lcache, void *key, llink = link2llink(link); D_ASSERT(llink->ll_evicted == 0); if (llink->ll_evicting) { - daos_lru_ref_release(lcache, llink); + /** + * Avoid calling `lru_hop_rec_decref()` at this point + * to prevent `wakeup()` from being invoked twice. + */ + D_ASSERT(llink->ll_ref > 1); + llink->ll_ref--; D_GOTO(out, rc = -DER_SHUTDOWN); } /* remove busy item from LRU */ diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index cc18a1ac915..1c0e73c9640 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1220,6 +1220,9 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che /* Handle the entries whose leaders are on current server. */ d_list_for_each_entry_safe(dsp, tmp, &self, dsp_link) { struct dtx_entry dte; + struct dtx_entry *pdte = &dte; + struct dtx_cos_key dck; + d_list_del(&dsp->dsp_link); @@ -1228,13 +1231,31 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che dte.dte_refs = 1; dte.dte_mbs = dsp->dsp_mbs; + if (for_io) { + rc = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, NULL, NULL, NULL, false); + switch(rc) { + case DTX_ST_COMMITTABLE: + dck.oid = dsp->dsp_oid; + dck.dkey_hash = dsp->dsp_dkey_hash; + rc = dtx_commit(cont, &pdte, &dck, 1); + if (rc < 0 && rc != -DER_NONEXIST && for_io) + d_list_add_tail(&dsp->dsp_link, cmt_list); + else + dtx_dsp_free(dsp); + continue; + case DTX_ST_COMMITTED: + case -DER_NONEXIST: /* Aborted */ + dtx_dsp_free(dsp); + continue; + default: + break; + } + } + rc = dtx_status_handle_one(cont, &dte, dsp->dsp_oid, dsp->dsp_dkey_hash, dsp->dsp_epoch, NULL, NULL); switch (rc) { case DSHR_NEED_COMMIT: { - struct dtx_entry *pdte = &dte; - struct dtx_cos_key dck; - dck.oid = dsp->dsp_oid; dck.dkey_hash = dsp->dsp_dkey_hash; rc = dtx_commit(cont, &pdte, &dck, 1); diff --git a/src/gurt/errno.c b/src/gurt/errno.c index 57bf346f7a0..684726c2e65 100644 --- a/src/gurt/errno.c +++ b/src/gurt/errno.c @@ -1,15 +1,13 @@ /* - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ /** * This file is part of GURT. */ -#include - +#include #include -#include #include #include diff --git a/src/include/daos_srv/bio.h b/src/include/daos_srv/bio.h index 21f1801a3f4..a69f456e232 100644 --- a/src/include/daos_srv/bio.h +++ b/src/include/daos_srv/bio.h @@ -486,11 +486,13 @@ void bio_xsctxt_free(struct bio_xs_context *ctxt); * Health check on the per-xstream NVMe context * * \param[in] xs_ctxt Per-xstream NVMe context + * \param[in] log_err Log media error if the device is not healthy + * \param[in] update The check is called for an update operation or not * * \returns 0: NVMe context is healthy * -DER_NVME_IO: NVMe context is faulty */ -int bio_xsctxt_health_check(struct bio_xs_context *xs_ctxt); +int bio_xsctxt_health_check(struct bio_xs_context *xs_ctxt, bool log_err, bool update); /** * NVMe poller to poll NVMe I/O completions. diff --git a/src/include/gurt/list.h b/src/include/gurt/list.h index c339ac3c601..2b033e9f1e3 100644 --- a/src/include/gurt/list.h +++ b/src/include/gurt/list.h @@ -1,5 +1,5 @@ /* - * (C) Copyright 2016-2022 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -242,8 +242,7 @@ d_list_splice_init(d_list_t *list, d_list_t *head) * \param[in] type the type of the struct this is embedded in. * \param[in] member the member name of the list within the struct. */ -#define d_list_entry(ptr, type, member) \ - ((type *)((char *)(ptr)-(char *)(&((type *)0)->member))) +#define d_list_entry(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) #define d_list_pop_entry(list, type, member) \ ({ \ diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a2cb1f41adb..3c950615034 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -5655,7 +5655,8 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) 1 /* start, [0] is for current engine */, ocpi->ocpi_disp_width, &exec_arg.coll_cur); - rc = dtx_leader_begin(ioc.ioc_vos_coh, &odm->odm_xid, &epoch, 1, version, + rc = dtx_leader_begin(ioc.ioc_vos_coh, &odm->odm_xid, &epoch, + dcts[0].dct_shards[dmi->dmi_tgt_id].dcs_nr, version, &ocpi->ocpi_oid, NULL /* dti_cos */, 0 /* dti_cos_cnt */, NULL /* tgts */, exec_arg.coll_cur.grp_nr /* tgt_cnt */, dtx_flags, odm->odm_mbs, dce, &dlh); diff --git a/src/tests/ftest/dfuse/bash.py b/src/tests/ftest/dfuse/bash.py index d8bce4c7b48..964d0295954 100644 --- a/src/tests/ftest/dfuse/bash.py +++ b/src/tests/ftest/dfuse/bash.py @@ -128,7 +128,10 @@ def run_bashcmd(self, il_lib=None, compatible_mode=False): f"bzip2 -z {fuse_root_dir}/lib.a", f"chmod u-r {fuse_root_dir}/lib.a.bz2", 'fio --readwrite=randwrite --name=test --size="2M" --directory ' - f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync ' + f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync --thread=0' + "--group_reporting --exitall_on_error --continue_on_error=none", + 'fio --readwrite=randwrite --name=test --size="2M" --directory ' + f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync --thread=1' "--group_reporting --exitall_on_error --continue_on_error=none", 'fio --readwrite=randwrite --name=test --size="2M" --directory ' f'{fuse_root_dir}/ --bs=1M --numjobs="1" --ioengine=libaio --iodepth=16' diff --git a/src/tests/ftest/soak/harassers.yaml b/src/tests/ftest/soak/harassers.yaml index c02a0ae90ce..af7065fb3d8 100644 --- a/src/tests/ftest/soak/harassers.yaml +++ b/src/tests/ftest/soak/harassers.yaml @@ -108,6 +108,7 @@ soak_harassers: enable_intercept_lib: false enable_remote_logging: false enable_scrubber: false + enable_rebuild_logmasks: false # Commandline parameters # Benchmark and application params # IOR params -a DFS and -a MPIIO diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 8ee45aba1fd..f32e068cb16 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -78,6 +78,7 @@ def __init__(self, *args, **kwargs): self.soak_log_dir = None self.soak_dir = None self.enable_scrubber = False + self.enable_rebuild_logmasks = False def setUp(self): """Define test setup to be done.""" @@ -574,6 +575,8 @@ def run_soak(self, test_param): "enable_remote_logging", os.path.join(test_param, "*"), False) self.enable_scrubber = self.params.get( "enable_scrubber", os.path.join(test_param, "*"), False) + self.enable_rebuild_logmasks = self.params.get( + "enable_rebuild_logmasks", os.path.join(test_param, "*"), False) if harassers: run_harasser = True self.log.info("<< Initial harasser list = %s>>", harassers) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index f73adf88b99..d190a06e759 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -320,7 +320,8 @@ def wait_for_pool_rebuild(self, pool, name): """ rebuild_status = False self.log.info("<> at %s", name, pool.identifier, time.ctime()) - self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) + if self.enable_rebuild_logmasks: + self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) try: # # Wait for rebuild to start # pool.wait_for_rebuild_to_start() @@ -328,12 +329,14 @@ def wait_for_pool_rebuild(self, pool, name): pool.wait_for_rebuild_to_end() rebuild_status = True except DaosTestError as error: - self.log.error(f"<< #include +#include "self_test_lib.h" #include "crt_utils.h" #include #include #include -#define CRT_SELF_TEST_AUTO_BULK_THRESH (1 << 20) -#define CRT_SELF_TEST_GROUP_NAME ("crt_self_test") - -struct st_size_params { - uint32_t send_size; - uint32_t reply_size; - union { - struct { - enum crt_st_msg_type send_type: 2; - enum crt_st_msg_type reply_type: 2; - }; - uint32_t flags; - }; -}; - -struct st_endpoint { - uint32_t rank; - uint32_t tag; -}; - -struct st_master_endpt { - crt_endpoint_t endpt; - struct crt_st_status_req_out reply; - int32_t test_failed; - int32_t test_completed; -}; - -static const char * const crt_st_msg_type_str[] = { "EMPTY", - "IOV", - "BULK_PUT", - "BULK_GET" }; - /* User input maximum values */ #define SELF_TEST_MAX_REPETITIONS (0x40000000) #define SELF_TEST_MAX_INFLIGHT (0x40000000) #define SELF_TEST_MAX_LIST_STR_LEN (1 << 16) #define SELF_TEST_MAX_NUM_ENDPOINTS (UINT32_MAX) -/* Global shutdown flag, used to terminate the progress thread */ -static int g_shutdown_flag; -static bool g_randomize_endpoints; -static bool g_group_inited; -static bool g_context_created; -static bool g_cart_inited; - -static void *progress_fn(void *arg) -{ - int ret; - crt_context_t *crt_ctx = NULL; - - crt_ctx = (crt_context_t *)arg; - D_ASSERT(crt_ctx != NULL); - D_ASSERT(*crt_ctx != NULL); - - while (!g_shutdown_flag) { - ret = crt_progress(*crt_ctx, 1); - if (ret != 0 && ret != -DER_TIMEDOUT) { - D_ERROR("crt_progress failed; ret = %d\n", ret); - break; - } - }; - - pthread_exit(NULL); -} - -static int -self_test_init(char *dest_name, crt_context_t *crt_ctx, crt_group_t **srv_grp, pthread_t *tid, - char *attach_info_path, bool listen, bool use_agent, bool no_sync) -{ - uint32_t init_flags = 0; - uint32_t grp_size; - d_rank_list_t *rank_list = NULL; - int attach_retries = 40; - int i; - d_rank_t max_rank = 0; - int ret; - crt_init_options_t opt = {0}; - crt_init_options_t *init_opt; - - /* rank, num_attach_retries, is_server, assert_on_error */ - crtu_test_init(0, attach_retries, false, false); - - if (use_agent) { - ret = dc_agent_init(); - if (ret != 0) { - fprintf(stderr, "dc_agent_init() failed. ret: %d\n", ret); - return ret; - } - ret = crtu_dc_mgmt_net_cfg_setenv(dest_name, &opt); - if (ret != 0) { - D_ERROR("crtu_dc_mgmt_net_cfg_setenv() failed; ret = %d\n", ret); - return ret; - } - - init_opt = &opt; - } else { - init_opt = NULL; - } - - if (listen) - init_flags |= (CRT_FLAG_BIT_SERVER | CRT_FLAG_BIT_AUTO_SWIM_DISABLE); - - ret = crt_init_opt(CRT_SELF_TEST_GROUP_NAME, init_flags, init_opt); - if (ret != 0) - return ret; - - D_FREE(opt.cio_provider); - D_FREE(opt.cio_interface); - D_FREE(opt.cio_domain); - g_cart_inited = true; - - if (attach_info_path) { - ret = crt_group_config_path_set(attach_info_path); - D_ASSERTF(ret == 0, - "crt_group_config_path_set failed, ret = %d\n", ret); - } - - ret = crt_context_create(crt_ctx); - if (ret != 0) { - D_ERROR("crt_context_create failed; ret = %d\n", ret); - return ret; - } - g_context_created = true; - - if (use_agent) { - ret = crt_group_view_create(dest_name, srv_grp); - if (!*srv_grp || ret != 0) { - D_ERROR("Failed to create group view; ret=%d\n", ret); - assert(0); - } - - ret = crtu_dc_mgmt_net_cfg_rank_add(dest_name, *srv_grp, *crt_ctx); - if (ret != 0) { - fprintf(stderr, "crtu_dc_mgmt_net_cfg_rank_add() failed. ret: %d\n", ret); - return ret; - } - } else { - /* DAOS-8839: Do not limit retries, instead rely on global test timeout */ - while (1) { - ret = crt_group_attach(dest_name, srv_grp); - if (ret == 0) - break; - sleep(1); - } - } - - if (ret != 0) { - D_ERROR("crt_group_attach failed; ret = %d\n", ret); - return ret; - } - - g_group_inited = true; - - D_ASSERTF(*srv_grp != NULL, - "crt_group_attach succeeded but returned group is NULL\n"); - - DBG_PRINT("Attached %s\n", dest_name); - - g_shutdown_flag = 0; - - ret = pthread_create(tid, NULL, progress_fn, crt_ctx); - if (ret != 0) { - D_ERROR("failed to create progress thread: %s\n", - strerror(errno)); - return -DER_MISC; - } - - ret = crt_group_size(*srv_grp, &grp_size); - D_ASSERTF(ret == 0, "crt_group_size() failed; rc=%d\n", ret); - - ret = crt_group_ranks_get(*srv_grp, &rank_list); - D_ASSERTF(ret == 0, - "crt_group_ranks_get() failed; rc=%d\n", ret); - - D_ASSERTF(rank_list != NULL, "Rank list is NULL\n"); - - D_ASSERTF(rank_list->rl_nr == grp_size, - "rank_list differs in size. expected %d got %d\n", - grp_size, rank_list->rl_nr); - - ret = crt_group_psr_set(*srv_grp, rank_list->rl_ranks[0]); - D_ASSERTF(ret == 0, "crt_group_psr_set() failed; rc=%d\n", ret); - - /* waiting to sync with the following parameters - * 0 - tag 0 - * 1 - total ctx - * 60 - ping timeout - * 120 - total timeout - */ - /* Only ping ranks if not using agent, and user didn't ask for no-sync */ - if (!use_agent && !no_sync) { - ret = crtu_wait_for_ranks(*crt_ctx, *srv_grp, rank_list, 0, 1, 60, 120); - D_ASSERTF(ret == 0, "wait_for_ranks() failed; ret=%d\n", ret); - } - - max_rank = rank_list->rl_ranks[0]; - for (i = 1; i < rank_list->rl_nr; i++) { - if (rank_list->rl_ranks[i] > max_rank) - max_rank = rank_list->rl_ranks[i]; - } - - d_rank_list_free(rank_list); - - ret = crt_rank_self_set(max_rank+1, 1 /* group_version_min */); - if (ret != 0) { - D_ERROR("crt_rank_self_set failed; ret = %d\n", ret); - return ret; - } - - return 0; -} - -static int st_compare_endpts(const void *a_in, const void *b_in) -{ - struct st_endpoint *a = (struct st_endpoint *)a_in; - struct st_endpoint *b = (struct st_endpoint *)b_in; - - if (a->rank != b->rank) - return a->rank > b->rank; - return a->tag > b->tag; -} - -static int st_compare_latencies_by_vals(const void *a_in, const void *b_in) -{ - struct st_latency *a = (struct st_latency *)a_in; - struct st_latency *b = (struct st_latency *)b_in; - - if (a->val != b->val) - return a->val > b->val; - return a->cci_rc > b->cci_rc; -} - -static int st_compare_latencies_by_ranks(const void *a_in, const void *b_in) -{ - struct st_latency *a = (struct st_latency *)a_in; - struct st_latency *b = (struct st_latency *)b_in; - - if (a->rank != b->rank) - return a->rank > b->rank; - if (a->tag != b->tag) - return a->tag > b->tag; - if (a->val != b->val) - return a->val > b->val; - return a->cci_rc > b->cci_rc; -} - -static void -start_test_cb(const struct crt_cb_info *cb_info) -{ - /* Result returned to main thread */ - int32_t *return_status = cb_info->cci_arg; - - /* Status retrieved from the RPC result payload */ - int32_t *reply_status; - - /* Check the status of the RPC transport itself */ - if (cb_info->cci_rc != 0) { - *return_status = cb_info->cci_rc; - return; - } - - /* Get the status from the payload */ - reply_status = (int32_t *)crt_reply_get(cb_info->cci_rpc); - D_ASSERT(reply_status != NULL); - - /* Return whatever result we got to the main thread */ - *return_status = *reply_status; -} - -static void -status_req_cb(const struct crt_cb_info *cb_info) -{ - /* Result returned to main thread */ - struct crt_st_status_req_out *return_status = cb_info->cci_arg; - - /* Status retrieved from the RPC result payload */ - struct crt_st_status_req_out *reply_status; - - /* Check the status of the RPC transport itself */ - if (cb_info->cci_rc != 0) { - return_status->status = cb_info->cci_rc; - return; - } - - /* Get the status from the payload */ - reply_status = crt_reply_get(cb_info->cci_rpc); - D_ASSERT(reply_status != NULL); - - /* - * Return whatever result we got to the main thread - * - * Write these in specific order so we can avoid locking - * TODO: This assumes int32 writes are atomic - * (they are on x86 if 4-byte aligned) - */ - return_status->test_duration_ns = reply_status->test_duration_ns; - return_status->num_remaining = reply_status->num_remaining; - return_status->status = reply_status->status; -} - /* * Iterates over a list of failing latency measurements and prints out the * count of each type of failure, along with the error string and code @@ -535,556 +242,6 @@ static void print_results(struct st_latency *latencies, } -static int test_msg_size(crt_context_t crt_ctx, - struct st_master_endpt *ms_endpts, - uint32_t num_ms_endpts, - struct crt_st_start_params *test_params, - struct st_latency **latencies, - crt_bulk_t *latencies_bulk_hdl, int output_megabits) -{ - - int ret; - int done; - uint32_t failed_count; - uint32_t complete_count; - crt_rpc_t *new_rpc; - struct crt_st_start_params *start_args; - uint32_t m_idx; - - /* - * Launch self-test 1:many sessions on each master endpoint - * as simultaneously as possible (don't wait for acknowledgment) - */ - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { - crt_endpoint_t *endpt = &ms_endpts[m_idx].endpt; - - /* Create and send a new RPC starting the test */ - ret = crt_req_create(crt_ctx, endpt, CRT_OPC_SELF_TEST_START, - &new_rpc); - if (ret != 0) { - D_ERROR("Creating start RPC failed to endpoint" - " %u:%u; ret = %d\n", endpt->ep_rank, - endpt->ep_tag, ret); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - continue; - } - - start_args = (struct crt_st_start_params *) - crt_req_get(new_rpc); - D_ASSERTF(start_args != NULL, - "crt_req_get returned NULL\n"); - memcpy(start_args, test_params, sizeof(*test_params)); - start_args->srv_grp = test_params->srv_grp; - - /* Set the launch status to a known impossible value */ - ms_endpts[m_idx].reply.status = INT32_MAX; - - ret = crt_req_send(new_rpc, start_test_cb, - &ms_endpts[m_idx].reply.status); - if (ret != 0) { - D_ERROR("Failed to send start RPC to endpoint %u:%u; " - "ret = %d\n", endpt->ep_rank, endpt->ep_tag, - ret); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - continue; - } - } - - /* - * Wait for each node to report whether or not the test launched - * successfully - */ - do { - /* Flag indicating all test launches have returned a status */ - done = 1; - - /* Wait a bit for tests to finish launching */ - sched_yield(); - - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) - if (ms_endpts[m_idx].reply.status == INT32_MAX) { - /* No response yet... */ - done = 0; - break; - } - } while (done != 1); - - /* Print a warning for any 1:many sessions that failed to launch */ - failed_count = 0; - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) - if (ms_endpts[m_idx].reply.status != 0) { - D_ERROR("Failed to launch self-test 1:many session on" - " %u:%u; ret = %d\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, - ms_endpts[m_idx].reply.status); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - failed_count++; - } else if (ms_endpts[m_idx].test_failed != 0) { - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - failed_count++; - } else { - ms_endpts[m_idx].test_failed = 0; - ms_endpts[m_idx].test_completed = 0; - - } - - - /* Check to make sure that at least one 1:many session was started */ - if (failed_count >= num_ms_endpts) { - D_ERROR("Failed to launch any 1:many test sessions\n"); - return ms_endpts[0].reply.status; - } - - /* - * Poll the master nodes until all tests complete - * (either successfully or by returning an error) - */ - do { - /* Wait a small amount of time for tests to progress */ - sleep(1); - - /* Send status requests to every non-finished node */ - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { - /* Skip endpoints that have finished */ - if (ms_endpts[m_idx].test_completed != 0) - continue; - - /* Set result status to impossible guard value */ - ms_endpts[m_idx].reply.status = INT32_MAX; - - /* Create a new RPC to check the status */ - ret = crt_req_create(crt_ctx, &ms_endpts[m_idx].endpt, - CRT_OPC_SELF_TEST_STATUS_REQ, - &new_rpc); - if (ret != 0) { - D_ERROR("Creating status request RPC to" - " endpoint %u:%u; ret = %d\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, - ret); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - continue; - } - - /* - * Sent data is the bulk handle where results should - * be written - */ - *((crt_bulk_t *)crt_req_get(new_rpc)) = - latencies_bulk_hdl[m_idx]; - - /* Send the status request */ - ret = crt_req_send(new_rpc, status_req_cb, - &ms_endpts[m_idx].reply); - if (ret != 0) { - D_ERROR("Failed to send status RPC to endpoint" - " %u:%u; ret = %d\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, ret); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - continue; - } - } - - /* Wait for all status request results to come back */ - do { - /* Flag indicating all status requests have returned */ - done = 1; - - /* Wait a bit for status requests to be handled */ - sched_yield(); - - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) - if (ms_endpts[m_idx].reply.status == - INT32_MAX && - ms_endpts[m_idx].test_completed == 0) { - /* No response yet... */ - done = 0; - break; - } - } while (done != 1); - - complete_count = 0; - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { - /* Skip endpoints that have already finished */ - if (ms_endpts[m_idx].test_completed != 0) { - complete_count++; - continue; - } - - switch (ms_endpts[m_idx].reply.status) { - case CRT_ST_STATUS_TEST_IN_PROGRESS: - D_DEBUG(DB_TEST, "Test still processing on " - "%u:%u - # RPCs remaining: %u\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, - ms_endpts[m_idx].reply.num_remaining); - break; - case CRT_ST_STATUS_TEST_COMPLETE: - ms_endpts[m_idx].test_completed = 1; - break; - default: - D_ERROR("Detected test failure on %u:%u -" - " ret = %d\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, - ms_endpts[m_idx].reply.status); - ms_endpts[m_idx].test_failed = 1; - ms_endpts[m_idx].test_completed = 1; - complete_count++; - } - } - } while (complete_count < num_ms_endpts); - - /* - * TODO: - * In the future, probably want to return the latencies here - * before they are processed for display to the user. - */ - - /* Print the results for this size */ - printf("##################################################\n"); - printf("Results for message size (%d-%s %d-%s)" - " (max_inflight_rpcs = %d):\n\n", - test_params->send_size, - crt_st_msg_type_str[test_params->send_type], - test_params->reply_size, - crt_st_msg_type_str[test_params->reply_type], - test_params->max_inflight); - - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { - int print_count; - - /* Skip endpoints that failed */ - if (ms_endpts[m_idx].test_failed != 0) - continue; - - /* Print a header for this endpoint and store number of chars */ - printf("Master Endpoint %u:%u%n\n", - ms_endpts[m_idx].endpt.ep_rank, - ms_endpts[m_idx].endpt.ep_tag, - &print_count); - - /* Print a nice line under the header of the right length */ - for (; print_count > 0; print_count--) - printf("-"); - printf("\n"); - - print_results(latencies[m_idx], test_params, - ms_endpts[m_idx].reply.test_duration_ns, - output_megabits); - } - - return 0; -} - -static void -randomize_endpts(struct st_endpoint *endpts, uint32_t num_endpts) -{ - struct st_endpoint tmp; - int r_index; - int i; - int k; - - srand(time(NULL)); - - printf("Randomizing order of endpoints\n"); - /* Shuffle endpoints few times */ - - for (k = 0; k < 10; k++) - for (i = 0; i < num_endpts; i++) { - r_index = rand() % num_endpts; - - tmp = endpts[i]; - endpts[i] = endpts[r_index]; - endpts[r_index] = tmp; - } - - printf("New order:\n"); - for (i = 0; i < num_endpts; i++) { - printf("%d:%d ", endpts[i].rank, endpts[i].tag); - } - printf("\n"); -} - -static int -run_self_test(struct st_size_params all_params[], int num_msg_sizes, int rep_count, - int max_inflight, char *dest_name, struct st_endpoint *ms_endpts_in, - uint32_t num_ms_endpts_in, struct st_endpoint *endpts, uint32_t num_endpts, - int output_megabits, int16_t buf_alignment, char *attach_info_path, bool use_agent, - bool no_sync) -{ - crt_context_t crt_ctx; - crt_group_t *srv_grp; - pthread_t tid; - - int size_idx; - uint32_t m_idx; - - int ret; - int cleanup_ret; - - struct st_master_endpt *ms_endpts = NULL; - uint32_t num_ms_endpts = 0; - - struct st_latency **latencies = NULL; - d_iov_t *latencies_iov = NULL; - d_sg_list_t *latencies_sg_list = NULL; - crt_bulk_t *latencies_bulk_hdl = CRT_BULK_NULL; - bool listen = false; - - crt_endpoint_t self_endpt; - - /* Sanity checks that would indicate bugs */ - D_ASSERT(endpts != NULL && num_endpts > 0); - D_ASSERT((ms_endpts_in == NULL && num_ms_endpts_in == 0) || - (ms_endpts_in != NULL && num_ms_endpts_in > 0)); - - /* will send TEST_START RPC to self, so listen for incoming requests */ - if (ms_endpts_in == NULL) - listen = true; - /* Initialize CART */ - ret = self_test_init(dest_name, &crt_ctx, &srv_grp, &tid, attach_info_path, - listen /* run as server */, use_agent, no_sync); - if (ret != 0) { - D_ERROR("self_test_init failed; ret = %d\n", ret); - D_GOTO(cleanup_nothread, ret); - } - - /* Get the group/rank/tag for this application (self_endpt) */ - ret = crt_group_rank(NULL, &self_endpt.ep_rank); - if (ret != 0) { - D_ERROR("crt_group_rank failed; ret = %d\n", ret); - D_GOTO(cleanup, ret); - } - self_endpt.ep_grp = crt_group_lookup(CRT_SELF_TEST_GROUP_NAME); - if (self_endpt.ep_grp == NULL) { - D_ERROR("crt_group_lookup failed for group %s\n", - CRT_SELF_TEST_GROUP_NAME); - D_GOTO(cleanup, ret = -DER_NONEXIST); - } - self_endpt.ep_tag = 0; - - /* - * Allocate a new list of unique master endpoints, each with a - * crt_endpoint_t and additional metadata - */ - if (ms_endpts_in == NULL) { - /* - * If no master endpoints were specified, allocate just one and - * set it to self_endpt - */ - num_ms_endpts = 1; - D_ALLOC_PTR(ms_endpts); - if (ms_endpts == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - ms_endpts[0].endpt.ep_rank = self_endpt.ep_rank; - ms_endpts[0].endpt.ep_tag = self_endpt.ep_tag; - ms_endpts[0].endpt.ep_grp = self_endpt.ep_grp; - } else { - /* - * If master endpoints were specified, initially allocate enough - * space to hold all of them, but only unique master endpoints - * to the new list - */ - D_ALLOC_ARRAY(ms_endpts, num_ms_endpts_in); - if (ms_endpts == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - - /* - * Sort the supplied endpoints to make it faster to identify - * duplicates - */ - qsort(ms_endpts_in, num_ms_endpts_in, - sizeof(ms_endpts_in[0]), st_compare_endpts); - - /* Add the first element to the new list */ - ms_endpts[0].endpt.ep_rank = ms_endpts_in[0].rank; - ms_endpts[0].endpt.ep_tag = ms_endpts_in[0].tag; - /* - * TODO: This isn't right - it should be self_endpt.ep_grp. - * However, this requires changes elsewhere - this is tracked - * by CART-187. - * - * As implemented here, rank 0 tag 0 in the client group will - * be used as the master endpoint by default - */ - ms_endpts[0].endpt.ep_grp = srv_grp; - num_ms_endpts = 1; - - /* - * Add unique elements to the new list - */ - for (m_idx = 1; m_idx < num_ms_endpts_in; m_idx++) - if ((ms_endpts_in[m_idx].rank != - ms_endpts[num_ms_endpts - 1].endpt.ep_rank) || - (ms_endpts_in[m_idx].tag != - ms_endpts[num_ms_endpts - 1].endpt.ep_tag)) { - ms_endpts[num_ms_endpts].endpt.ep_rank = - ms_endpts_in[m_idx].rank; - ms_endpts[num_ms_endpts].endpt.ep_tag = - ms_endpts_in[m_idx].tag; - ms_endpts[num_ms_endpts].endpt.ep_grp = - srv_grp; - num_ms_endpts++; - } - - /* - * If the counts don't match up, some were duplicates - resize - * the resulting smaller array which contains only unique - * entries - */ - if (num_ms_endpts != num_ms_endpts_in) { - struct st_master_endpt *realloc_ptr; - - D_REALLOC_ARRAY(realloc_ptr, ms_endpts, - num_ms_endpts_in, num_ms_endpts); - if (realloc_ptr == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - ms_endpts = realloc_ptr; - } - } - - /* Allocate latency lists for each 1:many session */ - D_ALLOC_ARRAY(latencies, num_ms_endpts); - if (latencies == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - D_ALLOC_ARRAY(latencies_iov, num_ms_endpts); - if (latencies_iov == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - D_ALLOC_ARRAY(latencies_sg_list, num_ms_endpts); - if (latencies_sg_list == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - D_ALLOC_ARRAY(latencies_bulk_hdl, num_ms_endpts); - if (latencies_bulk_hdl == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - - /* - * For each 1:many session, allocate an array for latency results. - * Map that array to an IOV, and create a bulk handle that will be used - * to transfer latency results back into that buffer - */ - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { - D_ALLOC_ARRAY(latencies[m_idx], rep_count); - if (latencies[m_idx] == NULL) - D_GOTO(cleanup, ret = -DER_NOMEM); - d_iov_set(&latencies_iov[m_idx], latencies[m_idx], - rep_count * sizeof(**latencies)); - latencies_sg_list[m_idx].sg_iovs = - &latencies_iov[m_idx]; - latencies_sg_list[m_idx].sg_nr = 1; - - ret = crt_bulk_create(crt_ctx, &latencies_sg_list[m_idx], - CRT_BULK_RW, - &latencies_bulk_hdl[m_idx]); - if (ret != 0) { - D_ERROR("Failed to allocate latencies bulk handle;" - " ret = %d\n", ret); - D_GOTO(cleanup, ret); - } - D_ASSERT(latencies_bulk_hdl != CRT_BULK_NULL); - } - - if (g_randomize_endpoints) { - randomize_endpts(endpts, num_endpts); - } - - for (size_idx = 0; size_idx < num_msg_sizes; size_idx++) { - struct crt_st_start_params test_params = { 0 }; - - /* Set test parameters to send to the test node */ - d_iov_set(&test_params.endpts, endpts, - num_endpts * sizeof(*endpts)); - test_params.rep_count = rep_count; - test_params.max_inflight = max_inflight; - test_params.send_size = all_params[size_idx].send_size; - test_params.reply_size = all_params[size_idx].reply_size; - test_params.send_type = all_params[size_idx].send_type; - test_params.reply_type = all_params[size_idx].reply_type; - test_params.buf_alignment = buf_alignment; - test_params.srv_grp = dest_name; - - ret = test_msg_size(crt_ctx, ms_endpts, num_ms_endpts, - &test_params, latencies, latencies_bulk_hdl, - output_megabits); - if (ret != 0) { - D_ERROR("Testing message size (%d-%s %d-%s) failed;" - " ret = %d\n", - test_params.send_size, - crt_st_msg_type_str[test_params.send_type], - test_params.reply_size, - crt_st_msg_type_str[test_params.reply_type], - ret); - D_GOTO(cleanup, ret); - } - } - -cleanup: - /* Tell the progress thread to abort and exit */ - g_shutdown_flag = 1; - - if (pthread_join(tid, NULL)) { - D_ERROR("Could not join progress thread\n"); - ret = -1; - } - -cleanup_nothread: - if (latencies_bulk_hdl != NULL) { - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) - if (latencies_bulk_hdl[m_idx] != CRT_BULK_NULL) - crt_bulk_free(latencies_bulk_hdl[m_idx]); - D_FREE(latencies_bulk_hdl); - } - if (latencies_sg_list != NULL) - D_FREE(latencies_sg_list); - if (latencies_iov != NULL) - D_FREE(latencies_iov); - if (ms_endpts != NULL) - D_FREE(ms_endpts); - if (latencies != NULL) { - for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) - if (latencies[m_idx] != NULL) - D_FREE(latencies[m_idx]); - D_FREE(latencies); - } - - if (srv_grp != NULL && g_group_inited) { - cleanup_ret = crt_group_detach(srv_grp); - if (cleanup_ret != 0) - D_ERROR("crt_group_detach failed; ret = %d\n", - cleanup_ret); - /* Make sure first error is returned, if applicable */ - ret = ((ret == 0) ? cleanup_ret : ret); - } - - cleanup_ret = 0; - if (g_context_created) { - cleanup_ret = crt_context_destroy(crt_ctx, 0); - if (cleanup_ret != 0) - D_ERROR("crt_context_destroy failed; ret = %d\n", cleanup_ret); - } - - /* Make sure first error is returned, if applicable */ - ret = ((ret == 0) ? cleanup_ret : ret); - - cleanup_ret = 0; - if (g_cart_inited) { - cleanup_ret = crt_finalize(); - if (cleanup_ret != 0) - D_ERROR("crt_finalize failed; ret = %d\n", cleanup_ret); - } - /* Make sure first error is returned, if applicable */ - ret = ((ret == 0) ? cleanup_ret : ret); - return ret; -} - static void print_usage(const char *prog_name, const char *msg_sizes_str, int rep_count, int max_inflight) { @@ -1702,6 +859,41 @@ int parse_message_sizes_string(const char *pch, return 0; } +static void +print_size_results(struct crt_st_start_params *test_params, struct st_master_endpt *ms_endpts, + uint32_t num_ms_endpts, struct st_latency **latencies, int output_megabits) +{ + int m_idx; + + /* Print the results for this size */ + printf("##################################################\n"); + printf("Results for message size (%d-%s %d-%s)" + " (max_inflight_rpcs = %d):\n\n", + test_params->send_size, crt_st_msg_type_str[test_params->send_type], + test_params->reply_size, crt_st_msg_type_str[test_params->reply_type], + test_params->max_inflight); + + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { + int print_count; + + /* Skip endpoints that failed */ + if (ms_endpts[m_idx].test_failed != 0) + continue; + + /* Print a header for this endpoint and store number of chars */ + printf("Master Endpoint %u:%u%n\n", ms_endpts[m_idx].endpt.ep_rank, + ms_endpts[m_idx].endpt.ep_tag, &print_count); + + /* Print a nice line under the header of the right length */ + for (; print_count > 0; print_count--) + printf("-"); + printf("\n"); + + print_results(latencies[m_idx], test_params, + ms_endpts[m_idx].reply.test_duration_ns, output_megabits); + } +} + int main(int argc, char *argv[]) { /* Default parameters */ @@ -1717,20 +909,24 @@ int main(int argc, char *argv[]) struct st_size_params *all_params = NULL; char *sizes_ptr = NULL; char *pch = NULL; - int num_msg_sizes; + int num_msg_sizes = 0; int num_tokens; int c; - int j; + int j; int ret = 0; - struct st_endpoint *endpts = NULL; - struct st_endpoint *ms_endpts = NULL; - uint32_t num_endpts = 0; - uint32_t num_ms_endpts = 0; - int output_megabits = 0; - int16_t buf_alignment = CRT_ST_BUF_ALIGN_DEFAULT; - char *attach_info_path = NULL; - bool use_agent = false; - bool no_sync = false; + struct st_endpoint *tgt_endpts = NULL; + struct st_endpoint *ms_endpts_opt = NULL; + struct st_master_endpt *ms_endpts = NULL; + uint32_t num_tgt_endpts = 0; + uint32_t num_ms_endpts_opt = 0; + uint32_t num_ms_endpts = 0; + struct st_latency ***size_latencies = NULL; + int output_megabits = 0; + int16_t buf_alignment = CRT_ST_BUF_ALIGN_DEFAULT; + char *attach_info_path = NULL; + bool randomize_eps = false; + bool use_agent = false; + bool no_sync = false; ret = d_log_init(); if (ret != 0) { @@ -1764,10 +960,10 @@ int main(int argc, char *argv[]) dest_name = optarg; break; case 'm': - parse_endpoint_string(optarg, &ms_endpts, &num_ms_endpts); + parse_endpoint_string(optarg, &ms_endpts_opt, &num_ms_endpts_opt); break; case 'e': - parse_endpoint_string(optarg, &endpts, &num_endpts); + parse_endpoint_string(optarg, &tgt_endpts, &num_tgt_endpts); break; case 's': msg_sizes_str = optarg; @@ -1814,7 +1010,7 @@ int main(int argc, char *argv[]) no_sync = true; break; case 'q': - g_randomize_endpoints = true; + randomize_eps = true; break; case 'h': @@ -1941,11 +1137,11 @@ int main(int argc, char *argv[]) D_GOTO(cleanup, ret = -DER_INVAL); } - if (ms_endpts == NULL) + if (ms_endpts_opt == NULL) printf("Warning: No --master-endpoint specified; using this" " command line application as the master endpoint\n"); - if (endpts == NULL || num_endpts == 0) { + if (tgt_endpts == NULL || num_tgt_endpts == 0) { int tag = 0; /* use context 0 as a default one for non-daos case */ /* In case of the DAOS engines first 2 contexts are reserved */ @@ -1953,14 +1149,18 @@ int main(int argc, char *argv[]) tag = 2; printf("Warning: No --endpoint specified; using 0:%d default\n", tag); - num_endpts = 1; - D_ALLOC_ARRAY(endpts, 1); - endpts[0].rank = 0; - endpts[0].tag = tag; + num_tgt_endpts = 1; + D_ALLOC_ARRAY(tgt_endpts, 1); + tgt_endpts[0].rank = 0; + tgt_endpts[0].tag = tag; + } + + if (randomize_eps) { + randomize_endpoints(tgt_endpts, num_tgt_endpts); } /* repeat rep_count for each endpoint */ - rep_count = rep_count * num_endpts; + rep_count = rep_count * num_tgt_endpts; if ((rep_count <= 0) || (rep_count > SELF_TEST_MAX_REPETITIONS)) { printf("Invalid --repetitions-per-size argument\n" @@ -1985,7 +1185,8 @@ int main(int argc, char *argv[]) printf("Self Test Parameters:\n" " Group name to test against: %s\n" " # endpoints: %u\n" - " Message sizes: [", dest_name, num_endpts); + " Message sizes: [", + dest_name, num_tgt_endpts); for (j = 0; j < num_msg_sizes; j++) { if (j > 0) printf(", "); @@ -2005,13 +1206,38 @@ int main(int argc, char *argv[]) /********************* Run the self test *********************/ ret = run_self_test(all_params, num_msg_sizes, rep_count, max_inflight, dest_name, - ms_endpts, num_ms_endpts, endpts, num_endpts, output_megabits, - buf_alignment, attach_info_path, use_agent, no_sync); + ms_endpts_opt, num_ms_endpts_opt, tgt_endpts, num_tgt_endpts, + &ms_endpts, &num_ms_endpts, &size_latencies, buf_alignment, + attach_info_path, use_agent, no_sync); + if (ret != 0) { + DL_ERROR(ret, "run_self_test() failed"); + D_GOTO(cleanup, ret); + } + /********************* Print the results *********************/ + for (j = 0; j < num_msg_sizes; j++) { + struct crt_st_start_params test_params = {0}; + + /* Set test parameters for display */ + test_params.rep_count = rep_count; + test_params.max_inflight = max_inflight; + test_params.send_size = all_params[j].send_size; + test_params.reply_size = all_params[j].reply_size; + test_params.send_type = all_params[j].send_type; + test_params.reply_type = all_params[j].reply_type; + test_params.buf_alignment = buf_alignment; + test_params.srv_grp = dest_name; + + D_ASSERT(size_latencies[j] != NULL); + print_size_results(&test_params, ms_endpts, num_ms_endpts, size_latencies[j], + output_megabits); + } /********************* Clean up *********************/ cleanup: + free_size_latencies(size_latencies, num_msg_sizes, num_ms_endpts); D_FREE(ms_endpts); - D_FREE(endpts); + D_FREE(ms_endpts_opt); + D_FREE(tgt_endpts); D_FREE(all_params); if (use_agent) diff --git a/src/utils/self_test/self_test_lib.c b/src/utils/self_test/self_test_lib.c new file mode 100644 index 00000000000..beb8e54cef2 --- /dev/null +++ b/src/utils/self_test/self_test_lib.c @@ -0,0 +1,803 @@ +/* + * (C) Copyright 2016-2024 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ + +#define D_LOGFAC DD_FAC(st) + +#include +#include +#include +#include +#include +#include + +#include "self_test_lib.h" +#include "crt_utils.h" +#include + +#include +#include + +/* Global shutdown flag, used to terminate the progress thread */ +static int g_shutdown_flag; +static bool g_group_inited; +static bool g_context_created; +static bool g_cart_inited; + +static void * +progress_fn(void *arg) +{ + int ret; + crt_context_t *crt_ctx = NULL; + + crt_ctx = (crt_context_t *)arg; + D_ASSERT(crt_ctx != NULL); + D_ASSERT(*crt_ctx != NULL); + + while (!g_shutdown_flag) { + ret = crt_progress(*crt_ctx, 1); + if (ret != 0 && ret != -DER_TIMEDOUT) { + D_ERROR("crt_progress failed; ret = %d\n", ret); + break; + } + }; + + pthread_exit(NULL); +} + +static int +self_test_init(char *dest_name, crt_context_t *crt_ctx, crt_group_t **srv_grp, pthread_t *tid, + char *attach_info_path, bool listen, bool use_agent, bool no_sync) +{ + uint32_t init_flags = 0; + uint32_t grp_size; + d_rank_list_t *rank_list = NULL; + int attach_retries = 40; + int i; + d_rank_t max_rank = 0; + int ret; + crt_init_options_t opt = {0}; + crt_init_options_t *init_opt; + + /* rank, num_attach_retries, is_server, assert_on_error */ + crtu_test_init(0, attach_retries, false, false); + + if (use_agent) { + ret = dc_agent_init(); + if (ret != 0) { + fprintf(stderr, "dc_agent_init() failed. ret: %d\n", ret); + return ret; + } + ret = crtu_dc_mgmt_net_cfg_setenv(dest_name, &opt); + if (ret != 0) { + D_ERROR("crtu_dc_mgmt_net_cfg_setenv() failed; ret = %d\n", ret); + return ret; + } + + init_opt = &opt; + } else { + init_opt = NULL; + } + + if (listen) + init_flags |= (CRT_FLAG_BIT_SERVER | CRT_FLAG_BIT_AUTO_SWIM_DISABLE); + + ret = crt_init_opt(CRT_SELF_TEST_GROUP_NAME, init_flags, init_opt); + if (ret != 0) + return ret; + + D_FREE(opt.cio_provider); + D_FREE(opt.cio_interface); + D_FREE(opt.cio_domain); + g_cart_inited = true; + + if (attach_info_path) { + ret = crt_group_config_path_set(attach_info_path); + D_ASSERTF(ret == 0, "crt_group_config_path_set failed, ret = %d\n", ret); + } + + ret = crt_context_create(crt_ctx); + if (ret != 0) { + D_ERROR("crt_context_create failed; ret = %d\n", ret); + return ret; + } + g_context_created = true; + + if (use_agent) { + ret = crt_group_view_create(dest_name, srv_grp); + if (!*srv_grp || ret != 0) { + D_ERROR("Failed to create group view; ret=%d\n", ret); + assert(0); + } + + ret = crtu_dc_mgmt_net_cfg_rank_add(dest_name, *srv_grp, *crt_ctx); + if (ret != 0) { + fprintf(stderr, "crtu_dc_mgmt_net_cfg_rank_add() failed. ret: %d\n", ret); + return ret; + } + } else { + /* DAOS-8839: Do not limit retries, instead rely on global test timeout */ + while (1) { + ret = crt_group_attach(dest_name, srv_grp); + if (ret == 0) + break; + sleep(1); + } + } + + if (ret != 0) { + D_ERROR("crt_group_attach failed; ret = %d\n", ret); + return ret; + } + + g_group_inited = true; + + D_ASSERTF(*srv_grp != NULL, "crt_group_attach succeeded but returned group is NULL\n"); + + g_shutdown_flag = 0; + + ret = pthread_create(tid, NULL, progress_fn, crt_ctx); + if (ret != 0) { + D_ERROR("failed to create progress thread: %s\n", strerror(errno)); + return -DER_MISC; + } + + ret = crt_group_size(*srv_grp, &grp_size); + D_ASSERTF(ret == 0, "crt_group_size() failed; rc=%d\n", ret); + + ret = crt_group_ranks_get(*srv_grp, &rank_list); + D_ASSERTF(ret == 0, "crt_group_ranks_get() failed; rc=%d\n", ret); + + D_ASSERTF(rank_list != NULL, "Rank list is NULL\n"); + + D_ASSERTF(rank_list->rl_nr == grp_size, "rank_list differs in size. expected %d got %d\n", + grp_size, rank_list->rl_nr); + + ret = crt_group_psr_set(*srv_grp, rank_list->rl_ranks[0]); + D_ASSERTF(ret == 0, "crt_group_psr_set() failed; rc=%d\n", ret); + + /* waiting to sync with the following parameters + * 0 - tag 0 + * 1 - total ctx + * 60 - ping timeout + * 120 - total timeout + */ + /* Only ping ranks if not using agent, and user didn't ask for no-sync */ + if (!use_agent && !no_sync) { + ret = crtu_wait_for_ranks(*crt_ctx, *srv_grp, rank_list, 0, 1, 60, 120); + D_ASSERTF(ret == 0, "wait_for_ranks() failed; ret=%d\n", ret); + } + + max_rank = rank_list->rl_ranks[0]; + for (i = 1; i < rank_list->rl_nr; i++) { + if (rank_list->rl_ranks[i] > max_rank) + max_rank = rank_list->rl_ranks[i]; + } + + d_rank_list_free(rank_list); + + ret = crt_rank_self_set(max_rank + 1, 1 /* group_version_min */); + if (ret != 0) { + D_ERROR("crt_rank_self_set failed; ret = %d\n", ret); + return ret; + } + + return 0; +} + +int +st_compare_endpts(const void *a_in, const void *b_in) +{ + struct st_endpoint *a = (struct st_endpoint *)a_in; + struct st_endpoint *b = (struct st_endpoint *)b_in; + + if (a->rank != b->rank) + return a->rank > b->rank; + return a->tag > b->tag; +} + +int +st_compare_latencies_by_vals(const void *a_in, const void *b_in) +{ + struct st_latency *a = (struct st_latency *)a_in; + struct st_latency *b = (struct st_latency *)b_in; + + if (a->val != b->val) + return a->val > b->val; + return a->cci_rc > b->cci_rc; +} + +int +st_compare_latencies_by_ranks(const void *a_in, const void *b_in) +{ + struct st_latency *a = (struct st_latency *)a_in; + struct st_latency *b = (struct st_latency *)b_in; + + if (a->rank != b->rank) + return a->rank > b->rank; + if (a->tag != b->tag) + return a->tag > b->tag; + if (a->val != b->val) + return a->val > b->val; + return a->cci_rc > b->cci_rc; +} + +static void +start_test_cb(const struct crt_cb_info *cb_info) +{ + /* Result returned to main thread */ + int32_t *return_status = cb_info->cci_arg; + + /* Status retrieved from the RPC result payload */ + int32_t *reply_status; + + /* Check the status of the RPC transport itself */ + if (cb_info->cci_rc != 0) { + *return_status = cb_info->cci_rc; + return; + } + + /* Get the status from the payload */ + reply_status = (int32_t *)crt_reply_get(cb_info->cci_rpc); + D_ASSERT(reply_status != NULL); + + /* Return whatever result we got to the main thread */ + *return_status = *reply_status; +} + +static void +status_req_cb(const struct crt_cb_info *cb_info) +{ + /* Result returned to main thread */ + struct crt_st_status_req_out *return_status = cb_info->cci_arg; + + /* Status retrieved from the RPC result payload */ + struct crt_st_status_req_out *reply_status; + + /* Check the status of the RPC transport itself */ + if (cb_info->cci_rc != 0) { + return_status->status = cb_info->cci_rc; + return; + } + + /* Get the status from the payload */ + reply_status = crt_reply_get(cb_info->cci_rpc); + D_ASSERT(reply_status != NULL); + + /* + * Return whatever result we got to the main thread + * + * Write these in specific order so we can avoid locking + * TODO: This assumes int32 writes are atomic + * (they are on x86 if 4-byte aligned) + */ + return_status->test_duration_ns = reply_status->test_duration_ns; + return_status->num_remaining = reply_status->num_remaining; + return_status->status = reply_status->status; +} + +static int +test_msg_size(crt_context_t crt_ctx, struct st_master_endpt *ms_endpts, uint32_t num_ms_endpts, + struct crt_st_start_params *test_params, struct st_latency **latencies, + crt_bulk_t *latencies_bulk_hdl) +{ + int ret; + int done; + uint32_t failed_count; + uint32_t complete_count; + crt_rpc_t *new_rpc; + struct crt_st_start_params *start_args; + uint32_t m_idx; + + /* + * Launch self-test 1:many sessions on each master endpoint + * as simultaneously as possible (don't wait for acknowledgment) + */ + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { + crt_endpoint_t *endpt = &ms_endpts[m_idx].endpt; + + /* Create and send a new RPC starting the test */ + ret = crt_req_create(crt_ctx, endpt, CRT_OPC_SELF_TEST_START, &new_rpc); + if (ret != 0) { + D_ERROR("Creating start RPC failed to endpoint" + " %u:%u; ret = %d\n", + endpt->ep_rank, endpt->ep_tag, ret); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + continue; + } + + start_args = (struct crt_st_start_params *)crt_req_get(new_rpc); + D_ASSERTF(start_args != NULL, "crt_req_get returned NULL\n"); + memcpy(start_args, test_params, sizeof(*test_params)); + start_args->srv_grp = test_params->srv_grp; + + /* Set the launch status to a known impossible value */ + ms_endpts[m_idx].reply.status = INT32_MAX; + + ret = crt_req_send(new_rpc, start_test_cb, &ms_endpts[m_idx].reply.status); + if (ret != 0) { + D_ERROR("Failed to send start RPC to endpoint %u:%u; " + "ret = %d\n", + endpt->ep_rank, endpt->ep_tag, ret); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + continue; + } + } + + /* + * Wait for each node to report whether or not the test launched + * successfully + */ + do { + /* Flag indicating all test launches have returned a status */ + done = 1; + + /* Wait a bit for tests to finish launching */ + sched_yield(); + + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (ms_endpts[m_idx].reply.status == INT32_MAX) { + /* No response yet... */ + done = 0; + break; + } + } while (done != 1); + + /* Print a warning for any 1:many sessions that failed to launch */ + failed_count = 0; + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (ms_endpts[m_idx].reply.status != 0) { + D_ERROR("Failed to launch self-test 1:many session on" + " %u:%u; ret = %d\n", + ms_endpts[m_idx].endpt.ep_rank, ms_endpts[m_idx].endpt.ep_tag, + ms_endpts[m_idx].reply.status); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + failed_count++; + } else if (ms_endpts[m_idx].test_failed != 0) { + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + failed_count++; + } else { + ms_endpts[m_idx].test_failed = 0; + ms_endpts[m_idx].test_completed = 0; + } + + /* Check to make sure that at least one 1:many session was started */ + if (failed_count >= num_ms_endpts) { + D_ERROR("Failed to launch any 1:many test sessions\n"); + return ms_endpts[0].reply.status; + } + + /* + * Poll the master nodes until all tests complete + * (either successfully or by returning an error) + */ + do { + /* Wait a small amount of time for tests to progress */ + sleep(1); + + /* Send status requests to every non-finished node */ + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { + /* Skip endpoints that have finished */ + if (ms_endpts[m_idx].test_completed != 0) + continue; + + /* Set result status to impossible guard value */ + ms_endpts[m_idx].reply.status = INT32_MAX; + + /* Create a new RPC to check the status */ + ret = crt_req_create(crt_ctx, &ms_endpts[m_idx].endpt, + CRT_OPC_SELF_TEST_STATUS_REQ, &new_rpc); + if (ret != 0) { + D_ERROR("Creating status request RPC to" + " endpoint %u:%u; ret = %d\n", + ms_endpts[m_idx].endpt.ep_rank, + ms_endpts[m_idx].endpt.ep_tag, ret); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + continue; + } + + /* + * Sent data is the bulk handle where results should + * be written + */ + *((crt_bulk_t *)crt_req_get(new_rpc)) = latencies_bulk_hdl[m_idx]; + + /* Send the status request */ + ret = crt_req_send(new_rpc, status_req_cb, &ms_endpts[m_idx].reply); + if (ret != 0) { + D_ERROR("Failed to send status RPC to endpoint" + " %u:%u; ret = %d\n", + ms_endpts[m_idx].endpt.ep_rank, + ms_endpts[m_idx].endpt.ep_tag, ret); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + continue; + } + } + + /* Wait for all status request results to come back */ + do { + /* Flag indicating all status requests have returned */ + done = 1; + + /* Wait a bit for status requests to be handled */ + sched_yield(); + + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (ms_endpts[m_idx].reply.status == INT32_MAX && + ms_endpts[m_idx].test_completed == 0) { + /* No response yet... */ + done = 0; + break; + } + } while (done != 1); + + complete_count = 0; + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { + /* Skip endpoints that have already finished */ + if (ms_endpts[m_idx].test_completed != 0) { + complete_count++; + continue; + } + + switch (ms_endpts[m_idx].reply.status) { + case CRT_ST_STATUS_TEST_IN_PROGRESS: + D_DEBUG(DB_TEST, + "Test still processing on " + "%u:%u - # RPCs remaining: %u\n", + ms_endpts[m_idx].endpt.ep_rank, + ms_endpts[m_idx].endpt.ep_tag, + ms_endpts[m_idx].reply.num_remaining); + break; + case CRT_ST_STATUS_TEST_COMPLETE: + ms_endpts[m_idx].test_completed = 1; + break; + default: + D_ERROR("Detected test failure on %u:%u -" + " ret = %d\n", + ms_endpts[m_idx].endpt.ep_rank, + ms_endpts[m_idx].endpt.ep_tag, + ms_endpts[m_idx].reply.status); + ms_endpts[m_idx].test_failed = 1; + ms_endpts[m_idx].test_completed = 1; + complete_count++; + } + } + } while (complete_count < num_ms_endpts); + + return 0; +} + +void +randomize_endpoints(struct st_endpoint *endpts, uint32_t num_endpts) +{ + struct st_endpoint tmp; + int r_index; + int i; + int k; + + srand(time(NULL)); + + printf("Randomizing order of endpoints\n"); + /* Shuffle endpoints few times */ + + for (k = 0; k < 10; k++) + for (i = 0; i < num_endpts; i++) { + r_index = rand() % num_endpts; + + tmp = endpts[i]; + endpts[i] = endpts[r_index]; + endpts[r_index] = tmp; + } + + printf("New order:\n"); + for (i = 0; i < num_endpts; i++) { + printf("%d:%d ", endpts[i].rank, endpts[i].tag); + } + printf("\n"); +} + +void +free_size_latencies(struct st_latency ***size_latencies, uint32_t num_msg_sizes, + uint32_t num_ms_endpts) +{ + int size_idx; + int m_idx; + + if (size_latencies == NULL) + return; + + for (size_idx = 0; size_idx < num_msg_sizes; size_idx++) { + struct st_latency **sess_latencies = size_latencies[size_idx]; + + if (sess_latencies == NULL) + continue; + + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (sess_latencies[m_idx] != NULL) + D_FREE(sess_latencies[m_idx]); + + D_FREE(size_latencies[size_idx]); + } + D_FREE(size_latencies); +} + +int +run_self_test(struct st_size_params all_params[], int num_msg_sizes, int rep_count, + int max_inflight, char *dest_name, struct st_endpoint *ms_endpts_in, + uint32_t num_ms_endpts_in, struct st_endpoint *endpts, uint32_t num_endpts, + struct st_master_endpt **ms_endpts_out, uint32_t *num_ms_endpts_out, + struct st_latency ****size_latencies_out, int16_t buf_alignment, + char *attach_info_path, bool use_agent, bool no_sync) +{ + crt_context_t crt_ctx; + crt_group_t *srv_grp; + pthread_t tid; + + int size_idx; + uint32_t m_idx; + + int ret; + int cleanup_ret; + struct st_master_endpt *ms_endpts = NULL; + uint32_t num_ms_endpts = 0; + + struct st_latency ***size_latencies = NULL; + d_iov_t *latencies_iov = NULL; + d_sg_list_t *latencies_sg_list = NULL; + crt_bulk_t *latencies_bulk_hdl = CRT_BULK_NULL; + bool listen = false; + + crt_endpoint_t self_endpt; + + /* Sanity checks that would indicate bugs */ + D_ASSERT(endpts != NULL && num_endpts > 0); + D_ASSERT((ms_endpts_in == NULL && num_ms_endpts_in == 0) || + (ms_endpts_in != NULL && num_ms_endpts_in > 0)); + D_ASSERT(ms_endpts_out != NULL && num_ms_endpts_out != NULL); + D_ASSERT(size_latencies_out != NULL); + + /* will send TEST_START RPC to self, so listen for incoming requests */ + if (ms_endpts_in == NULL) + listen = true; + /* Initialize CART */ + ret = self_test_init(dest_name, &crt_ctx, &srv_grp, &tid, attach_info_path, + listen /* run as server */, use_agent, no_sync); + if (ret != 0) { + D_ERROR("self_test_init failed; ret = %d\n", ret); + D_GOTO(cleanup_nothread, ret); + } + + /* Get the group/rank/tag for this application (self_endpt) */ + ret = crt_group_rank(NULL, &self_endpt.ep_rank); + if (ret != 0) { + D_ERROR("crt_group_rank failed; ret = %d\n", ret); + D_GOTO(cleanup, ret); + } + self_endpt.ep_grp = crt_group_lookup(CRT_SELF_TEST_GROUP_NAME); + if (self_endpt.ep_grp == NULL) { + D_ERROR("crt_group_lookup failed for group %s\n", CRT_SELF_TEST_GROUP_NAME); + D_GOTO(cleanup, ret = -DER_NONEXIST); + } + self_endpt.ep_tag = 0; + + /* + * Allocate a new list of unique master endpoints, each with a + * crt_endpoint_t and additional metadata + */ + if (ms_endpts_in == NULL) { + /* + * If no master endpoints were specified, allocate just one and + * set it to self_endpt + */ + num_ms_endpts = 1; + D_ALLOC_PTR(ms_endpts); + if (ms_endpts == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + ms_endpts[0].endpt.ep_rank = self_endpt.ep_rank; + ms_endpts[0].endpt.ep_tag = self_endpt.ep_tag; + ms_endpts[0].endpt.ep_grp = self_endpt.ep_grp; + } else { + /* + * If master endpoints were specified, initially allocate enough + * space to hold all of them, but only unique master endpoints + * to the new list + */ + D_ALLOC_ARRAY(ms_endpts, num_ms_endpts_in); + if (ms_endpts == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + + /* + * Sort the supplied endpoints to make it faster to identify + * duplicates + */ + qsort(ms_endpts_in, num_ms_endpts_in, sizeof(ms_endpts_in[0]), st_compare_endpts); + + /* Add the first element to the new list */ + ms_endpts[0].endpt.ep_rank = ms_endpts_in[0].rank; + ms_endpts[0].endpt.ep_tag = ms_endpts_in[0].tag; + /* + * TODO: This isn't right - it should be self_endpt.ep_grp. + * However, this requires changes elsewhere - this is tracked + * by CART-187. + * + * As implemented here, rank 0 tag 0 in the client group will + * be used as the master endpoint by default + */ + ms_endpts[0].endpt.ep_grp = srv_grp; + num_ms_endpts = 1; + + /* + * Add unique elements to the new list + */ + for (m_idx = 1; m_idx < num_ms_endpts_in; m_idx++) + if ((ms_endpts_in[m_idx].rank != + ms_endpts[num_ms_endpts - 1].endpt.ep_rank) || + (ms_endpts_in[m_idx].tag != + ms_endpts[num_ms_endpts - 1].endpt.ep_tag)) { + ms_endpts[num_ms_endpts].endpt.ep_rank = ms_endpts_in[m_idx].rank; + ms_endpts[num_ms_endpts].endpt.ep_tag = ms_endpts_in[m_idx].tag; + ms_endpts[num_ms_endpts].endpt.ep_grp = srv_grp; + num_ms_endpts++; + } + + /* + * If the counts don't match up, some were duplicates - resize + * the resulting smaller array which contains only unique + * entries + */ + if (num_ms_endpts != num_ms_endpts_in) { + struct st_master_endpt *realloc_ptr; + + D_REALLOC_ARRAY(realloc_ptr, ms_endpts, num_ms_endpts_in, num_ms_endpts); + if (realloc_ptr == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + ms_endpts = realloc_ptr; + } + } + + /* Allocate latency lists for each size */ + D_ALLOC_ARRAY(size_latencies, num_msg_sizes); + if (size_latencies == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + + /* Allocate latency lists for each 1:many session per size */ + for (size_idx = 0; size_idx < num_msg_sizes; size_idx++) { + D_ALLOC_ARRAY(size_latencies[size_idx], num_ms_endpts); + if (size_latencies[size_idx] == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + } + D_ALLOC_ARRAY(latencies_iov, num_ms_endpts); + if (latencies_iov == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + D_ALLOC_ARRAY(latencies_sg_list, num_ms_endpts); + if (latencies_sg_list == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + D_ALLOC_ARRAY(latencies_bulk_hdl, num_ms_endpts); + if (latencies_bulk_hdl == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + + for (size_idx = 0; size_idx < num_msg_sizes; size_idx++) { + struct crt_st_start_params test_params = {0}; + struct st_latency **latencies = size_latencies[size_idx]; + + D_ASSERT(latencies != NULL); + + /* + * For each 1:many session, allocate an array for latency results. + * Map that array to an IOV, and create a bulk handle that will be used + * to transfer latency results back into that buffer + */ + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) { + D_ALLOC_ARRAY(latencies[m_idx], rep_count); + if (latencies[m_idx] == NULL) + D_GOTO(cleanup, ret = -DER_NOMEM); + d_iov_set(&latencies_iov[m_idx], latencies[m_idx], + rep_count * sizeof(**latencies)); + latencies_sg_list[m_idx].sg_iovs = &latencies_iov[m_idx]; + latencies_sg_list[m_idx].sg_nr = 1; + + ret = crt_bulk_create(crt_ctx, &latencies_sg_list[m_idx], CRT_BULK_RW, + &latencies_bulk_hdl[m_idx]); + if (ret != 0) { + D_ERROR("Failed to allocate latencies bulk handle;" + " ret = %d\n", + ret); + D_GOTO(cleanup, ret); + } + D_ASSERT(latencies_bulk_hdl != CRT_BULK_NULL); + } + + /* Set test parameters to send to the test node */ + d_iov_set(&test_params.endpts, endpts, num_endpts * sizeof(*endpts)); + test_params.rep_count = rep_count; + test_params.max_inflight = max_inflight; + test_params.send_size = all_params[size_idx].send_size; + test_params.reply_size = all_params[size_idx].reply_size; + test_params.send_type = all_params[size_idx].send_type; + test_params.reply_type = all_params[size_idx].reply_type; + test_params.buf_alignment = buf_alignment; + test_params.srv_grp = dest_name; + + ret = test_msg_size(crt_ctx, ms_endpts, num_ms_endpts, &test_params, latencies, + latencies_bulk_hdl); + if (ret != 0) { + D_ERROR("Testing message size (%d-%s %d-%s) failed;" + " ret = %d\n", + test_params.send_size, crt_st_msg_type_str[test_params.send_type], + test_params.reply_size, crt_st_msg_type_str[test_params.reply_type], + ret); + D_GOTO(cleanup, ret); + } + + /* Clean up this size iteration's handles */ + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (latencies_bulk_hdl[m_idx] != CRT_BULK_NULL) + crt_bulk_free(latencies_bulk_hdl[m_idx]); + } + +cleanup: + /* Tell the progress thread to abort and exit */ + g_shutdown_flag = 1; + + if (pthread_join(tid, NULL)) { + D_ERROR("Could not join progress thread\n"); + ret = -1; + } + +cleanup_nothread: + if (latencies_bulk_hdl != NULL) { + for (m_idx = 0; m_idx < num_ms_endpts; m_idx++) + if (latencies_bulk_hdl[m_idx] != CRT_BULK_NULL) + crt_bulk_free(latencies_bulk_hdl[m_idx]); + D_FREE(latencies_bulk_hdl); + } + if (latencies_sg_list != NULL) + D_FREE(latencies_sg_list); + if (latencies_iov != NULL) + D_FREE(latencies_iov); + if (ret != 0) { + if (ms_endpts != NULL) + D_FREE(ms_endpts); + free_size_latencies(size_latencies, num_msg_sizes, num_ms_endpts); + } else { + *size_latencies_out = size_latencies; + *ms_endpts_out = ms_endpts; + *num_ms_endpts_out = num_ms_endpts; + } + + if (srv_grp != NULL && g_group_inited) { + cleanup_ret = crt_group_detach(srv_grp); + if (cleanup_ret != 0) + D_ERROR("crt_group_detach failed; ret = %d\n", cleanup_ret); + /* Make sure first error is returned, if applicable */ + ret = ((ret == 0) ? cleanup_ret : ret); + } + + cleanup_ret = 0; + if (g_context_created) { + cleanup_ret = crt_context_destroy(crt_ctx, 0); + if (cleanup_ret != 0) + D_ERROR("crt_context_destroy failed; ret = %d\n", cleanup_ret); + } + + /* Make sure first error is returned, if applicable */ + ret = ((ret == 0) ? cleanup_ret : ret); + + cleanup_ret = 0; + if (g_cart_inited) { + cleanup_ret = crt_finalize(); + if (cleanup_ret != 0) + D_ERROR("crt_finalize failed; ret = %d\n", cleanup_ret); + } + /* Make sure first error is returned, if applicable */ + ret = ((ret == 0) ? cleanup_ret : ret); + return ret; +} \ No newline at end of file diff --git a/src/utils/self_test/self_test_lib.h b/src/utils/self_test/self_test_lib.h new file mode 100644 index 00000000000..38dcc57f5c1 --- /dev/null +++ b/src/utils/self_test/self_test_lib.h @@ -0,0 +1,61 @@ +/** + * (C) Copyright 2016-2024 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ + +#ifndef __SELF_TEST_LIB_H__ +#define __SELF_TEST_LIB_H__ + +#include + +#include "crt_utils.h" + +#define CRT_SELF_TEST_AUTO_BULK_THRESH (1 << 20) +#define CRT_SELF_TEST_GROUP_NAME ("crt_self_test") + +struct st_size_params { + uint32_t send_size; + uint32_t reply_size; + union { + struct { + enum crt_st_msg_type send_type : 2; + enum crt_st_msg_type reply_type : 2; + }; + uint32_t flags; + }; +}; + +struct st_endpoint { + uint32_t rank; + uint32_t tag; +}; + +struct st_master_endpt { + crt_endpoint_t endpt; + struct crt_st_status_req_out reply; + int32_t test_failed; + int32_t test_completed; +}; + +static const char *const crt_st_msg_type_str[] = {"EMPTY", "IOV", "BULK_PUT", "BULK_GET"}; + +void +randomize_endpoints(struct st_endpoint *endpts, uint32_t num_endpts); +int +run_self_test(struct st_size_params all_params[], int num_msg_sizes, int rep_count, + int max_inflight, char *dest_name, struct st_endpoint *ms_endpts_in, + uint32_t num_ms_endpts_in, struct st_endpoint *endpts, uint32_t num_endpts, + struct st_master_endpt **ms_endpts_out, uint32_t *num_ms_endpts_out, + struct st_latency ****size_latencies, int16_t buf_alignment, char *attach_info_path, + bool use_agent, bool no_sync); +int +st_compare_endpts(const void *a_in, const void *b_in); +int +st_compare_latencies_by_vals(const void *a_in, const void *b_in); +int +st_compare_latencies_by_ranks(const void *a_in, const void *b_in); +void +free_size_latencies(struct st_latency ***latencies, uint32_t num_msg_sizes, uint32_t num_ms_endpts); + +#endif /* __SELF_TEST_LIB_H__ */ \ No newline at end of file diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index 28e2ac86757..fb8461e2931 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -269,13 +269,15 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, struct umem_rsrvd_act **rsrvd_scmp, d_list_t *nvme_exts, bool started, struct bio_desc *biod, int err) { - struct vos_pool *pool; - struct dtx_handle *dth = dth_in; - struct vos_dtx_act_ent *dae; - struct dtx_rsrvd_uint *dru; - struct vos_dtx_cmt_ent *dce = NULL; - struct dtx_handle tmp = {0}; - int rc; + struct vos_pool *pool; + struct umem_instance *umm; + struct dtx_handle *dth = dth_in; + struct vos_dtx_act_ent *dae; + struct vos_dtx_act_ent_df *dae_df; + struct dtx_rsrvd_uint *dru; + struct vos_dtx_cmt_ent *dce = NULL; + struct dtx_handle tmp = {0}; + int rc = 0; if (!dtx_is_valid_handle(dth)) { /** Created a dummy dth handle for publishing extents */ @@ -287,11 +289,11 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, D_INIT_LIST_HEAD(&tmp.dth_deferred_nvme); } - if (dth->dth_local) { + if (dth->dth_local) pool = vos_hdl2pool(dth_in->dth_poh); - } else { + else pool = cont->vc_pool; - } + umm = vos_pool2umm(pool); if (rsrvd_scmp != NULL) { D_ASSERT(nvme_exts != NULL); @@ -300,7 +302,7 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, * Just do your best to release the SCM reservation. Can't handle another * error while handling one already anyway. */ - (void)vos_publish_scm(vos_pool2umm(pool), *rsrvd_scmp, false /* publish */); + (void)vos_publish_scm(umm, *rsrvd_scmp, false /* publish */); D_FREE(*rsrvd_scmp); *rsrvd_scmp = NULL; err = -DER_NOMEM; @@ -341,9 +343,9 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, vos_dth_set(NULL, pool->vp_sysdb); if (bio_nvme_configured(SMD_DEV_TYPE_META) && biod != NULL) - err = umem_tx_end_ex(vos_pool2umm(pool), err, biod); + err = umem_tx_end_ex(umm, err, biod); else - err = umem_tx_end(vos_pool2umm(pool), err); + err = umem_tx_end(umm, err); cancel: if (dtx_is_valid_handle(dth_in)) { @@ -409,8 +411,11 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, vos_dtx_post_handle(cont, &dae, &dce, 1, false, err != 0); } else { D_ASSERT(dce == NULL); - if (err == 0) + if (err == 0) { dae->dae_prepared = 1; + dae_df = umem_off2ptr(umm, dae->dae_df_off); + D_ASSERT(!(dae_df->dae_flags & DTE_INVALID)); + } } } } diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 27a41ce5776..0e70133629f 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -639,17 +639,25 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, return 0; /* In spite of for commit or abort, the DTX must be local preparing/prepared. */ - D_ASSERTF(vos_dae_is_prepare(dae), "Unexpected DTX "DF_DTI" status for %s\n", - DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit"); + D_ASSERTF(vos_dae_is_prepare(dae), + "Unexpected DTX "DF_DTI" status for %s in pool "DF_UUID" cont "DF_UUID"\n", + DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit", + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); dbd = dae->dae_dbd; dae_df = umem_off2ptr(umm, dae->dae_df_off); - D_ASSERTF(dae_df != NULL, "Hit invalid DTX entry "DF_DTI" when release for %s\n", - DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit"); + D_ASSERTF(dae_df != NULL, "Hit invalid DTX entry "DF_DTI" when release for %s in pool " + DF_UUID" cont "DF_UUID"\n", DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit", + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); D_ASSERTF(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC, - "Invalid blob %p magic %x for "DF_DTI" (lid %x)\n", - dbd, dbd->dbd_magic, DP_DTI(&DAE_XID(dae)), DAE_LID(dae)); + "Bad blob %p magic %x for "DF_DTI" (lid %x) in pool "DF_UUID" cont "DF_UUID"\n", + dbd, dbd->dbd_magic, DP_DTI(&DAE_XID(dae)), DAE_LID(dae), + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); + D_ASSERTF(dbd->dbd_index > 0, + "%s DTX "DF_DTI" against new DTX blob %p in pool "DF_UUID" cont "DF_UUID"\n", + abort ? "abort" : "commit", DP_DTI(&DAE_XID(dae)), dbd, + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); if (!UMOFF_IS_NULL(dae_df->dae_mbs_off)) { /* dae_mbs_off will be invalid via flag DTE_INVALID. */ @@ -688,19 +696,16 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, } if (dbd->dbd_count > 1 || dbd->dbd_index < dbd->dbd_cap) { - rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, - sizeof(dae_df->dae_flags)); + rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc != 0) return rc; - /* Mark the DTX entry as invalid in SCM. */ - dae_df->dae_flags = DTE_INVALID; - - rc = umem_tx_add_ptr(umm, &dbd->dbd_count, - sizeof(dbd->dbd_count)); + rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count)); if (rc != 0) return rc; + /* Mark the DTX entry as invalid persistently. */ + dae_df->dae_flags = DTE_INVALID; dbd->dbd_count--; } else { struct vos_cont_df *cont_df = cont->vc_cont_df; @@ -922,6 +927,8 @@ vos_dtx_extend_act_table(struct vos_container *cont) dbd->dbd_magic = DTX_ACT_BLOB_MAGIC; dbd->dbd_cap = (DTX_BLOB_SIZE - sizeof(struct vos_dtx_blob_df)) / sizeof(struct vos_dtx_act_ent_df); + dbd->dbd_count = 0; + dbd->dbd_index = 0; tmp = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); if (tmp == NULL) { @@ -932,14 +939,14 @@ vos_dtx_extend_act_table(struct vos_container *cont) sizeof(cont_df->cd_dtx_active_head) + sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) - return rc; + goto out; cont_df->cd_dtx_active_head = dbd_off; } else { rc = umem_tx_add_ptr(umm, &tmp->dbd_next, sizeof(tmp->dbd_next)); if (rc != 0) - return rc; + goto out; tmp->dbd_next = dbd_off; @@ -947,19 +954,20 @@ vos_dtx_extend_act_table(struct vos_container *cont) rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_tail, sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) - return rc; + goto out; } cont_df->cd_dtx_active_tail = dbd_off; - D_DEBUG(DB_IO, "Allocated DTX active blob %p ("UMOFF_PF") for cont "DF_UUID"\n", - dbd, UMOFF_P(dbd_off), DP_UUID(cont->vc_id)); - - return 0; +out: + DL_CDEBUG(rc == 0, DB_IO, DLOG_ERR, rc, + "Allocated DTX active blob %p ("UMOFF_PF") for cont "DF_UUID, + dbd, UMOFF_P(dbd_off), DP_UUID(cont->vc_id)); + return rc; } static int -vos_dtx_alloc(struct umem_instance *umm, struct vos_dtx_blob_df *dbd, struct dtx_handle *dth) +vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) { struct vos_dtx_act_ent *dae = NULL; struct vos_container *cont; @@ -1005,21 +1013,12 @@ vos_dtx_alloc(struct umem_instance *umm, struct vos_dtx_blob_df *dbd, struct dtx DAE_MBS_FLAGS(dae) = 0; } - if (dbd != NULL) { - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - } - /* Will be set as dbd::dbd_index via vos_dtx_prepared(). */ DAE_INDEX(dae) = DTX_INDEX_INVAL; - dae->dae_dbd = dbd; dae->dae_dth = dth; - D_DEBUG(DB_IO, "Allocated new lid DTX: "DF_DTI" lid=%lx, dae=%p, dae_dbd=%p\n", - DP_DTI(&dth->dth_xid), DAE_LID(dae) & DTX_LID_SOLO_MASK, dae, dbd); + D_DEBUG(DB_IO, "Allocated new lid DTX: "DF_DTI" lid=%lx, dae=%p\n", + DP_DTI(&dth->dth_xid), DAE_LID(dae) & DTX_LID_SOLO_MASK, dae); d_iov_set(&kiov, &DAE_XID(dae), sizeof(DAE_XID(dae))); d_iov_set(&riov, dae, sizeof(*dae)); @@ -1445,46 +1444,6 @@ vos_dtx_validation(struct dtx_handle *dth) return rc; } -static int -vos_dtx_active(struct dtx_handle *dth) -{ - struct vos_dtx_act_ent *dae = dth->dth_ent; - struct vos_container *cont; - struct vos_cont_df *cont_df; - struct umem_instance *umm; - struct vos_dtx_blob_df *dbd; - int rc = 0; - - if (dae->dae_dbd != NULL) - goto out; - - cont = vos_hdl2cont(dth->dth_coh); - cont_df = cont->vc_cont_df; - umm = vos_cont2umm(cont); - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - - if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { - rc = vos_dtx_extend_act_table(cont); - if (rc != 0) - goto out; - - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - } - - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - dae->dae_dbd = dbd; - -out: - if (rc == 0) - dth->dth_active = 1; - - return rc; -} - /* The caller has started local transaction. */ int vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, @@ -1552,15 +1511,10 @@ vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, return 0; } - if (!dth->dth_active) { - rc = vos_dtx_active(dth); - if (rc != 0) - goto out; - } - rc = vos_dtx_append(dth, record, type); if (rc == 0) { /* Incarnation log entry implies a share */ + dth->dth_active = 1; *tx_id = DAE_LID(dae); if (type == DTX_RT_ILOG) dth->dth_modify_shared = 1; @@ -1577,20 +1531,18 @@ vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, } /* The caller has started local transaction. */ -void +int vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, uint32_t entry, daos_epoch_t epoch, umem_off_t record) { struct vos_container *cont; struct vos_dtx_act_ent *dae; - struct vos_dtx_act_ent_df *dae_df; - umem_off_t *rec_df; bool found; int count; int i; if (!vos_dtx_is_normal_entry(entry)) - return; + return 0; D_ASSERT(entry >= DTX_LID_RESERVED); @@ -1600,20 +1552,24 @@ vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, * The on-disk entry will be destroyed soon. */ if (cont == NULL) - return; + return 0; found = lrua_lookupx(cont->vc_dtx_array, entry - DTX_LID_RESERVED, epoch, &dae); if (!found) { D_WARN("Could not find active DTX record for lid=%d, epoch=" DF_U64"\n", entry, epoch); - return; + return 0; } - dae_df = umem_off2ptr(umm, dae->dae_df_off); - if (daos_is_zero_dti(&dae_df->dae_xid) || - dae_df->dae_flags & DTE_INVALID) - return; + /* + * NOTE: If the record to be deregistered (for free or overwrite, and so on) is referenced + * by another prepared (but non-committed) DTX, then do not allow current transaction + * to modify it. Because if current transaction is aborted or failed for some reason, + * there is no efficient way to recover such former non-committed DTX. + */ + if (dae->dae_dbd != NULL) + return dtx_inprogress(dae, vos_dth_get(cont->vc_pool->vp_sysdb), false, false, 8); if (DAE_REC_CNT(dae) > DTX_INLINE_REC_CNT) count = DTX_INLINE_REC_CNT; @@ -1623,46 +1579,18 @@ vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, for (i = 0; i < count; i++) { if (record == umem_off2offset(DAE_REC_INLINE(dae)[i])) { DAE_REC_INLINE(dae)[i] = UMOFF_NULL; - goto handle_df; + return 0; } } for (i = 0; i < DAE_REC_CNT(dae) - DTX_INLINE_REC_CNT; i++) { if (record == umem_off2offset(dae->dae_records[i])) { dae->dae_records[i] = UMOFF_NULL; - goto handle_df; - } - } - - /* Not found */ - return; - -handle_df: - if (dae_df->dae_rec_cnt > DTX_INLINE_REC_CNT) - count = DTX_INLINE_REC_CNT; - else - count = dae_df->dae_rec_cnt; - - rec_df = dae_df->dae_rec_inline; - for (i = 0; i < count; i++) { - if (umem_off2offset(rec_df[i]) == record) { - rec_df[i] = UMOFF_NULL; - return; + return 0; } } - rec_df = umem_off2ptr(umm, dae_df->dae_rec_off); - - /* Not found */ - if (rec_df == NULL) - return; - - for (i = 0; i < dae_df->dae_rec_cnt - DTX_INLINE_REC_CNT; i++) { - if (umem_off2offset(rec_df[i]) == record) { - rec_df[i] = UMOFF_NULL; - return; - } - } + return 0; } int @@ -1670,6 +1598,8 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) { struct vos_dtx_act_ent *dae = dth->dth_ent; struct vos_container *cont = vos_hdl2cont(dth->dth_coh); + struct vos_dtx_act_ent_df *dae_df; + struct vos_cont_df *cont_df; struct umem_instance *umm; struct vos_dtx_blob_df *dbd; umem_off_t rec_off; @@ -1705,9 +1635,26 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) return rc; } + D_ASSERT(dae->dae_dbd == NULL); + + cont_df = cont->vc_cont_df; umm = vos_cont2umm(cont); - dbd = dae->dae_dbd; - D_ASSERT(dbd != NULL); + dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); + if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { + rc = vos_dtx_extend_act_table(cont); + if (rc != 0) + return rc; + + dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); + } + + D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); + + dae->dae_dbd = dbd; + dae->dae_df_off = umem_ptr2off(umm, dbd) + + offsetof(struct vos_dtx_blob_df, dbd_active_data) + + sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; + dae_df = umem_off2ptr(umm, dae->dae_df_off); /* Use the dkey_hash for the last modification as the dkey_hash * for the whole transaction. It will used as the index for DTX @@ -1784,27 +1731,30 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) DAE_INDEX(dae) = dbd->dbd_index; if (DAE_INDEX(dae) > 0) { - rc = umem_tx_xadd_ptr(umm, umem_off2ptr(umm, dae->dae_df_off), - sizeof(struct vos_dtx_act_ent_df), UMEM_XADD_NO_SNAPSHOT); + rc = umem_tx_xadd_ptr(umm, dae_df, sizeof(*dae_df), UMEM_XADD_NO_SNAPSHOT); if (rc != 0) - return rc; + goto out; /* dbd_index is next to dbd_count */ rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count) + sizeof(dbd->dbd_index)); if (rc != 0) - return rc; + goto out; } - memcpy(umem_off2ptr(umm, dae->dae_df_off), - &dae->dae_base, sizeof(struct vos_dtx_act_ent_df)); + memcpy(dae_df, &dae->dae_base, sizeof(*dae_df)); dbd->dbd_count++; dbd->dbd_index++; dae->dae_preparing = 1; dae->dae_need_release = 1; - return 0; +out: + DL_CDEBUG(rc != 0, DLOG_ERR, DB_IO, rc, + "Preparing DTX "DF_DTI" in dbd "UMOFF_PF" at index %u, count %u, cap %u", + DP_DTI(&DAE_XID(dae)), UMOFF_P(cont_df->cd_dtx_active_tail), + dbd->dbd_index, dbd->dbd_count, dbd->dbd_cap); + return rc; } static struct dtx_memberships * @@ -2827,12 +2777,16 @@ vos_dtx_act_reindex(struct vos_container *cont) dbd_count++; } - D_ASSERTF(dbd_count == dbd->dbd_count, - "Unmatched active DTX count %d/%d, cap %d, idx %d for blob %p (" - UMOFF_PF"), head "UMOFF_PF", tail "UMOFF_PF"\n", - dbd_count, dbd->dbd_count, dbd->dbd_cap, dbd->dbd_index, dbd, - UMOFF_P(dbd_off), UMOFF_P(cont_df->cd_dtx_active_head), - UMOFF_P(cont_df->cd_dtx_active_tail)); + if (unlikely(dbd_count != dbd->dbd_count)) { + D_ERROR("Unmatched active DTX count %d/%d, cap %d, idx %d for blob %p (" + UMOFF_PF"), head "UMOFF_PF", tail "UMOFF_PF" in pool " + DF_UUID" cont "DF_UUID"\n", dbd_count, dbd->dbd_count, dbd->dbd_cap, + dbd->dbd_index, dbd, UMOFF_P(dbd_off), + UMOFF_P(cont_df->cd_dtx_active_head), + UMOFF_P(cont_df->cd_dtx_active_tail), DP_UUID(cont->vc_pool->vp_id), + DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_IO); + } dbd_off = dbd->dbd_next; } @@ -3012,13 +2966,12 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) { struct vos_container *cont; struct umem_instance *umm = NULL; - struct vos_dtx_blob_df *dbd = NULL; struct vos_dtx_cmt_ent *dce = NULL; - struct vos_cont_df *cont_df = NULL; struct vos_dtx_act_ent *dae; d_iov_t kiov; d_iov_t riov; int rc = 0; + bool tx = false; if (!dtx_is_valid_handle(dth)) return 0; @@ -3055,31 +3008,11 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) if (rc != 0) goto out; - cont_df = cont->vc_cont_df; - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { - rc = vos_dtx_extend_act_table(cont); - if (rc != 0) - goto out; - - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - } + tx = true; } - if (dth->dth_ent == NULL) { - rc = vos_dtx_alloc(umm, dbd, dth); - } else if (persistent) { - D_ASSERT(dbd != NULL); - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae = dth->dth_ent; - D_ASSERT(dae->dae_dbd == NULL); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - dae->dae_dbd = dbd; - } + if (dth->dth_ent == NULL) + rc = vos_dtx_alloc(umm, dth); out: if (rc == 0) { @@ -3094,7 +3027,7 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) } if (persistent) { - if (cont_df != NULL) { + if (tx) { if (rc == 0) { rc = umem_tx_commit(umm); D_ASSERTF(rc == 0, "local TX commit failure %d\n", rc); diff --git a/src/vos/vos_ilog.c b/src/vos/vos_ilog.c index ae67f9a00ac..54abf2f407f 100644 --- a/src/vos/vos_ilog.c +++ b/src/vos/vos_ilog.c @@ -82,8 +82,7 @@ vos_ilog_del(struct umem_instance *umm, umem_off_t ilog_off, uint32_t tx_id, return 0; coh.cookie = (unsigned long)args; - vos_dtx_deregister_record(umm, coh, tx_id, epoch, ilog_off); - return 0; + return vos_dtx_deregister_record(umm, coh, tx_id, epoch, ilog_off); } void diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 99d49c46fa7..7d4dd3ac166 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -747,8 +747,10 @@ vos_dtx_get(bool standalone); * \param epoch [IN] Epoch for the DTX. * \param record [IN] Address (offset) of the record to be * deregistered. + * + * \return 0 on success and negative on failure. */ -void +int vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, uint32_t entry, daos_epoch_t epoch, umem_off_t record); @@ -1766,12 +1768,13 @@ vos_flush_wal_header(struct vos_pool *vp) * Check if the NVMe context of a VOS target is healthy. * * \param[in] coh VOS container + * \param[in] update The check is for an update operation or not * * \return 0 : VOS target is healthy * -DER_NVME_IO : VOS target is faulty */ static inline int -vos_tgt_health_check(struct vos_container *cont) +vos_tgt_health_check(struct vos_container *cont, bool update) { D_ASSERT(cont != NULL); D_ASSERT(cont->vc_pool != NULL); @@ -1779,7 +1782,7 @@ vos_tgt_health_check(struct vos_container *cont) if (cont->vc_pool->vp_sysdb) return 0; - return bio_xsctxt_health_check(vos_xsctxt_get()); + return bio_xsctxt_health_check(vos_xsctxt_get(), true, update); } int diff --git a/src/vos/vos_io.c b/src/vos/vos_io.c index 31b6aa323d6..663e1c2c4d0 100644 --- a/src/vos/vos_io.c +++ b/src/vos/vos_io.c @@ -1506,7 +1506,7 @@ vos_fetch_end(daos_handle_t ioh, daos_size_t *size, int err) D_ASSERT(!ioc->ic_update); if (err == 0) { - err = vos_tgt_health_check(ioc->ic_cont); + err = vos_tgt_health_check(ioc->ic_cont, false); if (err) DL_ERROR(err, "Fail fetch due to faulty NVMe."); } @@ -1546,7 +1546,7 @@ vos_fetch_begin(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, D_DEBUG(DB_TRACE, "Fetch "DF_UOID", desc_nr %d, epoch "DF_X64"\n", DP_UOID(oid), iod_nr, epoch); - rc = vos_tgt_health_check(vos_hdl2cont(coh)); + rc = vos_tgt_health_check(vos_hdl2cont(coh), false); if (rc) { DL_ERROR(rc, DF_UOID": Reject fetch due to faulty NVMe.", DP_UOID(oid)); return rc; @@ -2543,7 +2543,7 @@ vos_update_end(daos_handle_t ioh, uint32_t pm_ver, daos_key_t *dkey, int err, vos_space_unhold(vos_cont2pool(ioc->ic_cont), &ioc->ic_space_held[0]); if (err == 0) { - err = vos_tgt_health_check(ioc->ic_cont); + err = vos_tgt_health_check(ioc->ic_cont, true); if (err) DL_ERROR(err, "Fail update due to faulty NVMe."); } @@ -2590,7 +2590,7 @@ vos_update_begin(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, "Prepare IOC for " DF_UOID ", iod_nr %d, epc " DF_X64 ", flags=" DF_X64 "\n", DP_UOID(oid), iod_nr, (dtx_is_real_handle(dth) ? dth->dth_epoch : epoch), flags); - rc = vos_tgt_health_check(vos_hdl2cont(coh)); + rc = vos_tgt_health_check(vos_hdl2cont(coh), true); if (rc) { DL_ERROR(rc, DF_UOID": Reject update due to faulty NVMe.", DP_UOID(oid)); return rc; diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index 55698af0d74..00219138bd4 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -455,7 +455,7 @@ vos_obj_punch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, D_DEBUG(DB_IO, "Punch "DF_UOID", epoch "DF_X64"\n", DP_UOID(oid), epr.epr_hi); - rc = vos_tgt_health_check(cont); + rc = vos_tgt_health_check(cont, true); if (rc) { DL_ERROR(rc, DF_UOID": Reject punch due to faulty NVMe.", DP_UOID(oid)); return rc; @@ -592,7 +592,7 @@ vos_obj_punch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, vos_ts_set_free(ts_set); if (rc == 0) { - rc = vos_tgt_health_check(cont); + rc = vos_tgt_health_check(cont, true); if (rc) DL_ERROR(rc, "Fail punch due to faulty NVMe."); } diff --git a/src/vos/vos_pool.c b/src/vos/vos_pool.c index b33698a571f..af958eafd5d 100644 --- a/src/vos/vos_pool.c +++ b/src/vos/vos_pool.c @@ -1479,7 +1479,7 @@ vos_pool_open_metrics(const char *path, uuid_t uuid, unsigned int flags, void *m } } - rc = bio_xsctxt_health_check(vos_xsctxt_get()); + rc = bio_xsctxt_health_check(vos_xsctxt_get(), false, false); if (rc) { DL_WARN(rc, DF_UUID": Skip pool open due to faulty NVMe.", DP_UUID(uuid)); return rc; diff --git a/src/vos/vos_tree.c b/src/vos/vos_tree.c index e48467f9364..c36fcaa88c5 100644 --- a/src/vos/vos_tree.c +++ b/src/vos/vos_tree.c @@ -601,6 +601,7 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, struct dtx_handle *dth = NULL; struct umem_rsrvd_act *rsrvd_scm; struct vos_container *cont = vos_hdl2cont(tins->ti_coh); + int rc; if (UMOFF_IS_NULL(rec->rec_off)) return 0; @@ -611,12 +612,12 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, return -DER_NO_PERM; /* Not allowed */ } - vos_dtx_deregister_record(&tins->ti_umm, tins->ti_coh, - irec->ir_dtx, *epc, rec->rec_off); + rc = vos_dtx_deregister_record(&tins->ti_umm, tins->ti_coh, + irec->ir_dtx, *epc, rec->rec_off); + if (rc != 0) + return rc; if (!overwrite) { - int rc; - /* SCM value is stored together with vos_irec_df */ if (addr->ba_type == DAOS_MEDIA_NVME) { struct vos_pool *pool = tins->ti_priv; @@ -796,9 +797,8 @@ evt_dop_log_del(struct umem_instance *umm, daos_epoch_t epoch, daos_handle_t coh; coh.cookie = (unsigned long)args; - vos_dtx_deregister_record(umm, coh, desc->dc_dtx, epoch, - umem_ptr2off(umm, desc)); - return 0; + return vos_dtx_deregister_record(umm, coh, desc->dc_dtx, epoch, + umem_ptr2off(umm, desc)); } void diff --git a/utils/rpms/daos.rpmlintrc b/utils/rpms/daos.rpmlintrc index 09022a74e59..aff4db9b7e6 100644 --- a/utils/rpms/daos.rpmlintrc +++ b/utils/rpms/daos.rpmlintrc @@ -44,7 +44,7 @@ addFilter("E: static-library-without-debuginfo \/usr\/lib64\/lib(dfuse|ioil)\.a" # these need to be fixed: # https://daosio.atlassian.net/browse/DAOS-11539 -addFilter("W: no-soname \/usr\/lib64\/lib(ds3|daos_(common|cmd_hdlrs|tests|serialize|common_pmem)|dfs|dfuse|duns|ioil|pil4dfs|dpar(|_mpi)).so") +addFilter("W: no-soname \/usr\/lib64\/lib(ds3|daos_(common|cmd_hdlrs|self_test|tests|serialize|common_pmem)|dfs|dfuse|duns|ioil|pil4dfs|dpar(|_mpi)).so") # Tests rpm needs to be able to build daos from source so pulls in build deps and is expected. addFilter("daos-client-tests.x86_64: E: devel-dependency protobuf-c-devel") diff --git a/utils/rpms/daos.spec b/utils/rpms/daos.spec index fc670934909..f90e6d7010f 100644 --- a/utils/rpms/daos.spec +++ b/utils/rpms/daos.spec @@ -15,7 +15,7 @@ Name: daos Version: 2.7.100 -Release: 4%{?relval}%{?dist} +Release: 5%{?relval}%{?dist} Summary: DAOS Storage Engine License: BSD-2-Clause-Patent @@ -485,6 +485,7 @@ getent passwd daos_agent >/dev/null || useradd -s /sbin/nologin -r -g daos_agent %{_bindir}/dfuse %{_bindir}/daos %{_libdir}/libdaos_cmd_hdlrs.so +%{_libdir}/libdaos_self_test.so %{_libdir}/libdfs.so %{_libdir}/libds3.so %{_libdir}/%{name}/API_VERSION @@ -591,6 +592,9 @@ getent passwd daos_agent >/dev/null || useradd -s /sbin/nologin -r -g daos_agent # No files in a shim package %changelog +* Thu Aug 15 2024 Michael MacDonald 2.7.100-5 +- Add libdaos_self_test.so to client RPM + * Mon Aug 05 2024 Jerome Soumagne 2.7.100-4 - Bump mercury version to 2.4.0rc4