diff --git a/ci/provisioning/post_provision_config.sh b/ci/provisioning/post_provision_config.sh index 11b308f513e..13f78358a83 100755 --- a/ci/provisioning/post_provision_config.sh +++ b/ci/provisioning/post_provision_config.sh @@ -20,7 +20,7 @@ source ci/provisioning/post_provision_config_common_functions.sh source ci/junit.sh -: "${MLNX_VER_NUM:=latest-5.8}" +: "${MLNX_VER_NUM:=24.04-0.6.6.0}" : "${DISTRO:=EL_7}" DSL_REPO_var="DAOS_STACK_${DISTRO}_LOCAL_REPO" diff --git a/docs/admin/administration.md b/docs/admin/administration.md index 444e4ff7674..1aeb7140305 100644 --- a/docs/admin/administration.md +++ b/docs/admin/administration.md @@ -44,10 +44,20 @@ severity, message, description, and cause. |Event|Event type|Severity|Message|Description|Cause| |:----|:----|:----|:----|:----|:----| +| device\_set\_faulty| INFO\_ONLY| NOTICE or ERROR| Device: set faulty / Device: set faulty failed: / Device: auto faulty detect / Device: auto faulty detect failed: | Indicates that a device has either been explicitly automatically set as faulty. Device UUID specified in event data. | Either DMG set nvme-faulty command was used to explicitly set device as faulty or an error threshold was reached on a device which has triggered an auto faulty reaction. | +| device\_media\_error| INFO\_ONLY| ERROR| Device: error logged from tgt\_id: | Indicates that a device media error has been detected for a specific target. The error type could be unmap, write, read or checksum (csum). Device UUID and target ID specified in event data. | Media error occurred on backing device. | +| device\_unplugged| INFO\_ONLY| NOTICE| Device: unplugged | Indicates device was physically removed from host. | NVMe SSD physically removed from host. | +| device\_plugged| INFO\_ONLY| NOTICE| Detected hot plugged device: | Indicates device was physically inserted into host. | NVMe SSD physically added to host. | +| device\_replace| INFO\_ONLY| NOTICE or ERROR| Replaced device: with device: [failed: ] | Indicates that a faulty device was replaced with a new device and if the operation failed. The old and new device IDs as well as any non-zero return code are specified in the event data. | Device was replaced using DMG nvme replace command. | +| device\_link\_speed\_changed| NOTICE or WARNING| NVMe PCIe device at port-: link speed changed to (max )| Indicates that an NVMe device link speed has changed. The negotiated and maximum device link speeds are indicated in the event message field and the severity is set to warning if the negotiated speed is not at maximum capability (and notice level severity if at maximum). No other specific information is included in the event data.| Either device link speed was previously downgraded and has returned to maximum or link speed has downgraded to a value that is less than its maximum capability.| +| device\_link\_width\_changed| NOTICE or WARNING| NVMe PCIe device at port-: link width changed to (max )| Indicates that an NVMe device link width has changed. The negotiated and maximum device link widths are indicated in the event message field and the severity is set to warning if the negotiated width is not at maximum capability (and notice level severity if at maximum). No other specific information is included in the event data.| Either device link width was previously downgraded and has returned to maximum or link width has downgraded to a value that is less than its maximum capability.| | engine\_format\_required|INFO\_ONLY|NOTICE|DAOS engine requires a format|Indicates engine is waiting for allocated storage to be formatted on formatted on instance with dmg tool. can be either SCM or Metadata.|DAOS server attempts to bring-up an engine that has unformatted storage.| | engine\_died| STATE\_CHANGE| ERROR| DAOS engine exited exited unexpectedly: | Indicates engine instance unexpectedly. describes the exit state returned from exited daos\_engine process.| N/A | -| engine\_asserted| STATE\_CHANGE| ERROR| TBD| Indicates engine instance threw a runtime assertion, causing a crash. | An unexpected internal state resulted in assert failure. | +| engine\_asserted| STATE\_CHANGE| ERROR| TBD| Indicates engine instance threw a runtime assertion, causing a crash. | An unexpected internal state resulted in assert failure. | | engine\_clock\_drift| INFO\_ONLY | ERROR| clock drift detected| Indicates CART comms layer has detected clock skew between engines.| NTP may not be syncing clocks across DAOS system. | +| engine\_join\_failed| INFO\_ONLY| ERROR | DAOS engine (rank ) was not allowed to join the system | Join operation failed for the given engine instance ID and rank (if assigned). | Reason should be provided in the extended info field of the event data. | +| pool\_corruption\_detected| INFO\_ONLY| ERROR | Data corruption detected| Indicates a corruption in pool data has been detected. The event fields will contain pool and container UUIDs. | A corruption was found by the checksum scrubber. | +| pool\_destroy\_deferred| INFO\_ONLY| WARNING | pool: destroy is deferred| Indicates a destroy operation has been deferre. | Pool destroy in progress but not complete. | | pool\_rebuild\_started| INFO\_ONLY| NOTICE | Pool rebuild started.| Indicates a pool rebuild has started. The event data field contains pool map version and pool operation identifier. | When a pool rank becomes unavailable a rebuild will be triggered. | | pool\_rebuild\_finished| INFO\_ONLY| NOTICE| Pool rebuild finished.| Indicates a pool rebuild has finished successfully. The event data field includes the pool map version and pool operation identifier. | N/A| | pool\_rebuild\_failed| INFO\_ONLY| ERROR| Pool rebuild failed: .| Indicates a pool rebuild has failed. The event data field includes the pool map version and pool operation identifier. provides a string representation of DER code.| N/A | @@ -59,7 +69,7 @@ severity, message, description, and cause. | swim\_rank\_dead| STATE\_CHANGE| NOTICE| SWIM rank marked as dead.| The SWIM protocol has detected the specified rank is unresponsive.| A remote DAOS engine has become unresponsive.| | system\_start\_failed| INFO\_ONLY| ERROR| System startup failed, | Indicates that a user initiated controlled startup failed. shows which ranks failed.| Ranks failed to start.| | system\_stop\_failed| INFO\_ONLY| ERROR| System shutdown failed during action, | Indicates that a user initiated controlled shutdown failed. identifies the failing shutdown action and shows which ranks failed.| Ranks failed to stop.| - +| system\_fabric\_provider\_changed| NOTICE| System fabric provider has changed: -> | Indicates that the system-wide fabric provider has been updated. No other specific information is included in event data.| A system-wide fabric provider change has been intentionally applied to all joined ranks.| ## System Logging diff --git a/src/bio/bio_monitor.c b/src/bio/bio_monitor.c index 18235c3efd1..e5197312112 100644 --- a/src/bio/bio_monitor.c +++ b/src/bio/bio_monitor.c @@ -221,15 +221,14 @@ bio_dev_set_faulty(struct bio_xs_context *xs, uuid_t dev_uuid) rc = dss_abterr2der(rc); if (rc == 0) - ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, - RAS_SEV_NOTICE, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, - "Dev: "DF_UUID" set faulty\n", DP_UUID(dev_uuid)); + ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, RAS_SEV_NOTICE, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, + "Device: " DF_UUID " set faulty\n", DP_UUID(dev_uuid)); else - ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, - RAS_SEV_ERROR, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, - "Dev: "DF_UUID" set faulty failed: %d\n", DP_UUID(dev_uuid), rc); + ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, RAS_SEV_ERROR, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, + "Device: " DF_UUID " set faulty failed: %d\n", DP_UUID(dev_uuid), + rc); return rc; } @@ -779,16 +778,14 @@ auto_faulty_detect(struct bio_blobstore *bbs) } if (rc == 0) - ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, - RAS_SEV_NOTICE, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, - "Dev: "DF_UUID" auto faulty detect\n", + ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, RAS_SEV_NOTICE, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, + "Device: " DF_UUID " auto faulty detect\n", DP_UUID(bbs->bb_dev->bb_uuid)); else - ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, - RAS_SEV_ERROR, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, - "Dev: "DF_UUID" auto faulty detect failed: %d\n", + ras_notify_eventf(RAS_DEVICE_SET_FAULTY, RAS_TYPE_INFO, RAS_SEV_ERROR, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL, + "Device: " DF_UUID " auto faulty detect failed: %d\n", DP_UUID(bbs->bb_dev->bb_uuid), rc); } diff --git a/src/bio/bio_xstream.c b/src/bio/bio_xstream.c index 051eb106bde..87e7ad40f3f 100644 --- a/src/bio/bio_xstream.c +++ b/src/bio/bio_xstream.c @@ -744,9 +744,8 @@ bio_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, D_ASSERT(d_bdev->bb_desc != NULL); d_bdev->bb_removed = 1; - ras_notify_eventf(RAS_DEVICE_UNPLUGGED, RAS_TYPE_INFO, - RAS_SEV_NOTICE, NULL, NULL, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, "Dev: "DF_UUID" unplugged\n", + ras_notify_eventf(RAS_DEVICE_UNPLUGGED, RAS_TYPE_INFO, RAS_SEV_NOTICE, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, "Device: " DF_UUID " unplugged\n", DP_UUID(d_bdev->bb_uuid)); /* The bio_bdev is still under construction */ diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 8f5687e88b0..789d75ceb31 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -14,97 +14,52 @@ * List of supported CaRT providers. The table is terminated with the last entry * having nad_str = NULL. */ -struct crt_na_dict crt_na_dict[] = { - { - .nad_type = CRT_PROV_SM, - .nad_str = "sm", - .nad_contig_eps = false, - .nad_port_bind = false, - }, { - .nad_type = CRT_PROV_OFI_VERBS_RXM, - .nad_str = "ofi+verbs;ofi_rxm", - .nad_alt_str = "ofi+verbs", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_OFI_TCP, - .nad_str = "ofi+tcp", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_OFI_TCP_RXM, - .nad_str = "ofi+tcp;ofi_rxm", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_OFI_CXI, - .nad_str = "ofi+cxi", - .nad_contig_eps = true, - .nad_port_bind = false, - }, { - .nad_type = CRT_PROV_OFI_OPX, - .nad_str = "ofi+opx", - .nad_contig_eps = false, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC, - .nad_str = "ucx+rc_v", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_UD, - .nad_str = "ucx+ud_v", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC_UD, - .nad_str = "ucx+rc_v,ud_v", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC_O, - .nad_str = "ucx+rc", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_UD_O, - .nad_str = "ucx+ud", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC_UD_O, - .nad_str = "ucx+rc,ud", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC_X, - .nad_str = "ucx+rc_x", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_UD_X, - .nad_str = "ucx+ud_x", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_RC_UD_X, - .nad_str = "ucx+rc_x,ud_x", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_DC_X, - .nad_str = "ucx+dc_x", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_type = CRT_PROV_UCX_TCP, - .nad_str = "ucx+tcp", - .nad_contig_eps = true, - .nad_port_bind = true, - }, { - .nad_str = NULL, - } -}; +struct crt_na_dict crt_na_dict[] = {{ + .nad_type = CRT_PROV_SM, + .nad_str = "sm", + .nad_contig_eps = false, + .nad_port_bind = false, + }, + { + .nad_type = CRT_PROV_OFI_VERBS_RXM, + .nad_str = "ofi+verbs;ofi_rxm", + .nad_alt_str = "ofi+verbs", + .nad_contig_eps = true, + .nad_port_bind = true, + }, + { + .nad_type = CRT_PROV_OFI_TCP, + .nad_str = "ofi+tcp", + .nad_contig_eps = true, + .nad_port_bind = true, + }, + { + .nad_type = CRT_PROV_OFI_TCP_RXM, + .nad_str = "ofi+tcp;ofi_rxm", + .nad_contig_eps = true, + .nad_port_bind = true, + }, + { + .nad_type = CRT_PROV_OFI_CXI, + .nad_str = "ofi+cxi", + .nad_contig_eps = true, + .nad_port_bind = false, + }, + { + .nad_type = CRT_PROV_OFI_OPX, + .nad_str = "ofi+opx", + .nad_contig_eps = false, + .nad_port_bind = true, + }, + { + .nad_type = CRT_PROV_UCX, + .nad_str = "ucx+ud_x", + .nad_contig_eps = true, + .nad_port_bind = true, + }, + { + .nad_str = NULL, + }}; int crt_hg_parse_uri(const char *uri, crt_provider_t *prov, char *addr) @@ -717,6 +672,8 @@ crt_get_info_string(bool primary, crt_provider_t provider, int iface_idx, start_port = crt_provider_ctx0_port_get(primary, provider); domain_str = crt_provider_domain_str_get(primary, provider, iface_idx); + D_ASSERTF(provider_str != NULL, "String for provider=%d not found\n", provider); + /* CXI provider uses domain names for info string */ if (provider == CRT_PROV_OFI_CXI) iface_str = NULL; @@ -735,8 +692,7 @@ crt_get_info_string(bool primary, crt_provider_t provider, int iface_idx, D_GOTO(out, rc); } - if (provider_str) - size += strlen(provider_str); + size = strlen(provider_str); if (domain_str) size += strlen(domain_str); if (iface_str) diff --git a/src/cart/crt_hg.h b/src/cart/crt_hg.h index e9b4b6511e5..a72b9ae49af 100644 --- a/src/cart/crt_hg.h +++ b/src/cart/crt_hg.h @@ -32,6 +32,8 @@ #define CRT_HG_POST_INCR (512) #define CRT_HG_MRECV_BUF (16) +#define CRT_UCX_STR "ucx" + struct crt_rpc_priv; struct crt_common_hdr; struct crt_corpc_hdr; @@ -40,7 +42,7 @@ struct crt_corpc_hdr; * Enumeration specifying providers supported by the library */ typedef enum { - CRT_PROV_SM = 0, + CRT_PROV_SM = 0, CRT_PROV_OFI_SOCKETS, CRT_PROV_OFI_VERBS_RXM, CRT_PROV_OFI_GNI, @@ -48,19 +50,9 @@ typedef enum { CRT_PROV_OFI_TCP_RXM, CRT_PROV_OFI_CXI, CRT_PROV_OFI_OPX, - CRT_PROV_OFI_LAST = CRT_PROV_OFI_OPX, - CRT_PROV_UCX_RC, - CRT_PROV_UCX_UD, - CRT_PROV_UCX_RC_UD, - CRT_PROV_UCX_RC_O, - CRT_PROV_UCX_UD_O, - CRT_PROV_UCX_RC_UD_O, - CRT_PROV_UCX_RC_X, - CRT_PROV_UCX_UD_X, - CRT_PROV_UCX_RC_UD_X, - CRT_PROV_UCX_DC_X, - CRT_PROV_UCX_TCP, - CRT_PROV_UCX_LAST = CRT_PROV_UCX_TCP, + CRT_PROV_OFI_LAST = CRT_PROV_OFI_OPX, + CRT_PROV_UCX, + CRT_PROV_UCX_LAST = CRT_PROV_UCX, /* Note: This entry should be the last valid one in enum */ CRT_PROV_COUNT, CRT_PROV_UNKNOWN = -1, @@ -75,8 +67,7 @@ crt_hg_parse_uri(const char *uri, crt_provider_t *prov, char *addr); static inline bool crt_provider_is_ucx(crt_provider_t prov) { - return (prov >= CRT_PROV_UCX_RC) && - (prov <= CRT_PROV_UCX_LAST); + return (prov >= CRT_PROV_UCX) && (prov <= CRT_PROV_UCX_LAST); } static inline bool @@ -96,6 +87,8 @@ struct crt_na_dict { bool nad_port_bind; /** a flag to indicate if endpoints are contiguous */ bool nad_contig_eps; + /** a flag to indicate if nad_str is allocated on the heap */ + bool nad_str_alloc; }; extern struct crt_na_dict crt_na_dict[]; diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index bf208b1af4a..21fc184d446 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -444,13 +444,13 @@ crt_provider_t crt_str_to_provider(const char *str_provider) { crt_provider_t prov = CRT_PROV_UNKNOWN; - int i; + int i, len; + char *p = NULL; if (str_provider == NULL) return prov; for (i = 0; crt_na_dict[i].nad_str != NULL; i++) { - if (!strncmp(str_provider, crt_na_dict[i].nad_str, strlen(crt_na_dict[i].nad_str) + 1) || (crt_na_dict[i].nad_alt_str && @@ -459,6 +459,21 @@ crt_str_to_provider(const char *str_provider) prov = crt_na_dict[i].nad_type; break; } + if (crt_na_dict[i].nad_type == CRT_PROV_UCX && + !strncmp(str_provider, CRT_UCX_STR, strlen(CRT_UCX_STR))) { + len = strlen(str_provider); + if (len > strlen(CRT_UCX_STR) && strchr(str_provider, '+')) { + D_STRNDUP(p, str_provider, len); + if (!p) { + return prov; + } else { + crt_na_dict[i].nad_str = p; + crt_na_dict[i].nad_str_alloc = true; + } + } + prov = crt_na_dict[i].nad_type; + break; + } } return prov; @@ -964,6 +979,10 @@ crt_finalize(void) crt_na_config_fini(false, crt_gdata.cg_secondary_provs[i]); } + for (i = 0; crt_na_dict[i].nad_str != NULL; i++) + if (crt_na_dict[i].nad_str_alloc) + D_FREE(crt_na_dict[i].nad_str); + D_FREE(crt_gdata.cg_secondary_provs); D_FREE(crt_gdata.cg_prov_gdata_secondary); } else { diff --git a/src/client/dfuse/pil4dfs/int_dfs.c b/src/client/dfuse/pil4dfs/int_dfs.c index 8440c05206e..9f47d37255f 100644 --- a/src/client/dfuse/pil4dfs/int_dfs.c +++ b/src/client/dfuse/pil4dfs/int_dfs.c @@ -940,13 +940,6 @@ child_hdlr(void) DL_WARN(rc, "daos_eq_lib_init() failed in child process"); daos_dti_reset(); td_eqh = main_eqh = DAOS_HDL_INVAL; - if (d_eq_count_max > 0) { - rc = daos_eq_create(&td_eqh); - if (rc) - DL_WARN(rc, "daos_eq_create() failed"); - else - main_eqh = td_eqh; - } context_reset = true; } diff --git a/src/control/events/ras.go b/src/control/events/ras.go index 0baa7a7edcd..0e77cd45eac 100644 --- a/src/control/events/ras.go +++ b/src/control/events/ras.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2022 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -55,6 +55,8 @@ const ( RASSystemStopFailed RASID = C.RAS_SYSTEM_STOP_FAILED // error RASEngineJoinFailed RASID = C.RAS_ENGINE_JOIN_FAILED // error RASSystemFabricProvChanged RASID = C.RAS_SYSTEM_FABRIC_PROV_CHANGED // info + RASNVMeLinkSpeedChanged RASID = C.RAS_DEVICE_LINK_SPEED_CHANGED // warning|notice + RASNVMeLinkWidthChanged RASID = C.RAS_DEVICE_LINK_WIDTH_CHANGED // warning|notice ) func (id RASID) String() string { diff --git a/src/control/server/ctl_firmware_test.go b/src/control/server/ctl_firmware_test.go index 290bffb4aa1..cab959063ec 100644 --- a/src/control/server/ctl_firmware_test.go +++ b/src/control/server/ctl_firmware_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2022 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -806,7 +806,7 @@ func TestCtlSvc_FirmwareUpdate(t *testing.T) { rCfg := new(engine.TestRunnerConfig) rCfg.Running.Store(tc.enginesRunning) runner := engine.NewTestRunner(rCfg, engine.MockConfig()) - instance := NewEngineInstance(log, nil, nil, runner) + instance := NewEngineInstance(log, nil, nil, runner, nil) if !tc.noRankEngines { instance._superblock = &Superblock{} instance._superblock.ValidRank = true diff --git a/src/control/server/ctl_ranks_rpc_test.go b/src/control/server/ctl_ranks_rpc_test.go index 0d2c1849e4e..9355a04bce8 100644 --- a/src/control/server/ctl_ranks_rpc_test.go +++ b/src/control/server/ctl_ranks_rpc_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -345,8 +345,8 @@ func TestServer_CtlSvc_StopRanks(t *testing.T) { defer ps.Close() svc.events = ps - dispatched := &eventsDispatched{cancel: cancel} - svc.events.Subscribe(events.RASTypeStateChange, dispatched) + subscriber := newMockSubscriber(1) + svc.events.Subscribe(events.RASTypeStateChange, subscriber) for i, e := range svc.harness.instances { ei := e.(*EngineInstance) @@ -393,7 +393,7 @@ func TestServer_CtlSvc_StopRanks(t *testing.T) { if tc.timeout != time.Duration(0) { <-ctx.Done() } - test.AssertEqual(t, 0, len(dispatched.rx), "number of events published") + test.AssertEqual(t, 0, len(subscriber.getRx()), "number of events published") if diff := cmp.Diff(tc.expResults, gotResp.Results, defRankCmpOpts...); diff != "" { t.Fatalf("unexpected response (-want, +got)\n%s\n", diff) diff --git a/src/control/server/ctl_storage_rpc_test.go b/src/control/server/ctl_storage_rpc_test.go index 42d817802c9..bf2d7ee43b5 100644 --- a/src/control/server/ctl_storage_rpc_test.go +++ b/src/control/server/ctl_storage_rpc_test.go @@ -1717,7 +1717,7 @@ func TestServer_CtlSvc_StorageFormat(t *testing.T) { esp := storage.MockProvider(log, 0, &ec.Storage, sysProv, scmProv, ebp, nil) - ei := NewEngineInstance(log, esp, nil, runner) + ei := NewEngineInstance(log, esp, nil, runner, nil) ei.ready.Store(tc.instancesStarted) // if the instance is expected to have a valid superblock, create one diff --git a/src/control/server/ctl_svc_test.go b/src/control/server/ctl_svc_test.go index 98e07fdb515..179486e2670 100644 --- a/src/control/server/ctl_svc_test.go +++ b/src/control/server/ctl_svc_test.go @@ -73,7 +73,7 @@ func newMockControlServiceFromBackends(t *testing.T, log logging.Logger, cfg *co runner := engine.NewTestRunner(trc, ec) storProv := storage.MockProvider(log, 0, &ec.Storage, syp, sp, bp, nil) - ei := NewEngineInstance(log, storProv, nil, runner) + ei := NewEngineInstance(log, storProv, nil, runner, nil) ei.setSuperblock(&Superblock{ Rank: ranklist.NewRankPtr(uint32(idx)), }) diff --git a/src/control/server/harness.go b/src/control/server/harness.go index 343d5d5b01f..8ea494d956e 100644 --- a/src/control/server/harness.go +++ b/src/control/server/harness.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2019-2023 Intel Corporation. +// (C) Copyright 2019-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -18,6 +18,7 @@ import ( ctlpb "github.com/daos-stack/daos/src/control/common/proto/ctl" srvpb "github.com/daos-stack/daos/src/control/common/proto/srv" "github.com/daos-stack/daos/src/control/drpc" + "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/lib/atm" "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/logging" @@ -31,6 +32,8 @@ import ( // NB: This interface is way too big right now; need to refactor in order // to limit scope. type Engine interface { + events.Publisher + // These are definitely wrong... They indicate that too much internal // information is being leaked outside of the implementation. newCret(string, error) *ctlpb.NvmeControllerResult @@ -62,8 +65,10 @@ type Engine interface { OnReady(...onReadyFn) GetStorage() *storage.Provider SetCheckerMode(bool) - Debugf(format string, args ...interface{}) - Tracef(format string, args ...interface{}) + Debugf(string, ...interface{}) + Tracef(string, ...interface{}) + GetLastHealthStats(string) *ctlpb.BioHealthResp + SetLastHealthStats(string, *ctlpb.BioHealthResp) } // EngineHarness is responsible for managing Engine instances. diff --git a/src/control/server/harness_test.go b/src/control/server/harness_test.go index 0b0c6ad877d..ec6bd08f36a 100644 --- a/src/control/server/harness_test.go +++ b/src/control/server/harness_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2019-2023 Intel Corporation. +// (C) Copyright 2019-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -267,7 +267,7 @@ func TestServer_Harness_Start(t *testing.T) { }, nil } - ei := NewEngineInstance(log, provider, joinFn, runner) + ei := NewEngineInstance(log, provider, joinFn, runner, nil) var isAP bool if tc.isAP && i == 0 { // first instance will be AP & bootstrap MS isAP = true diff --git a/src/control/server/instance.go b/src/control/server/instance.go index 9c1e19cfa63..d1b37795e77 100644 --- a/src/control/server/instance.go +++ b/src/control/server/instance.go @@ -15,9 +15,11 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/proto" + ctlpb "github.com/daos-stack/daos/src/control/common/proto/ctl" mgmtpb "github.com/daos-stack/daos/src/control/common/proto/mgmt" srvpb "github.com/daos-stack/daos/src/control/common/proto/srv" "github.com/daos-stack/daos/src/control/drpc" + "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/lib/atm" "github.com/daos-stack/daos/src/control/lib/control" "github.com/daos-stack/daos/src/control/lib/ranklist" @@ -42,6 +44,8 @@ type ( // be used with EngineHarness to manage and monitor multiple instances // per node. type EngineInstance struct { + events.Publisher + log logging.Logger runner EngineRunner storage *storage.Provider @@ -63,23 +67,25 @@ type EngineInstance struct { sync.RWMutex // these must be protected by a mutex in order to // avoid racy access. - _drpcSocket string - _cancelCtx context.CancelFunc - _superblock *Superblock - _lastErr error // populated when harness receives signal + _drpcSocket string + _cancelCtx context.CancelFunc + _superblock *Superblock + _lastErr error // populated when harness receives signal + _lastHealthStats map[string]*ctlpb.BioHealthResp } -// NewEngineInstance returns an *EngineInstance initialized with -// its dependencies. -func NewEngineInstance(l logging.Logger, p *storage.Provider, jf systemJoinFn, r EngineRunner) *EngineInstance { +// NewEngineInstance returns an *EngineInstance initialized with its dependencies. +func NewEngineInstance(l logging.Logger, p *storage.Provider, jf systemJoinFn, r EngineRunner, ps *events.PubSub) *EngineInstance { return &EngineInstance{ - log: l, - runner: r, - storage: p, - joinSystem: jf, - drpcReady: make(chan *srvpb.NotifyReadyReq), - storageReady: make(chan bool), - startRequested: make(chan bool), + log: l, + runner: r, + storage: p, + joinSystem: jf, + drpcReady: make(chan *srvpb.NotifyReadyReq), + storageReady: make(chan bool), + startRequested: make(chan bool), + Publisher: ps, + _lastHealthStats: make(map[string]*ctlpb.BioHealthResp), } } @@ -391,3 +397,23 @@ func (ei *EngineInstance) Debugf(format string, args ...interface{}) { func (ei *EngineInstance) Tracef(format string, args ...interface{}) { ei.log.Tracef(format, args...) } + +func (ei *EngineInstance) GetLastHealthStats(pciAddr string) *ctlpb.BioHealthResp { + ei.RLock() + defer ei.RUnlock() + + if ei._lastHealthStats == nil { + ei._lastHealthStats = make(map[string]*ctlpb.BioHealthResp) + } + return ei._lastHealthStats[pciAddr] +} + +func (ei *EngineInstance) SetLastHealthStats(pciAddr string, bhr *ctlpb.BioHealthResp) { + ei.Lock() + defer ei.Unlock() + + if ei._lastHealthStats == nil { + ei._lastHealthStats = make(map[string]*ctlpb.BioHealthResp) + } + ei._lastHealthStats[pciAddr] = bhr +} diff --git a/src/control/server/instance_drpc_test.go b/src/control/server/instance_drpc_test.go index 6383ddcf236..4f7e0a6c36f 100644 --- a/src/control/server/instance_drpc_test.go +++ b/src/control/server/instance_drpc_test.go @@ -92,7 +92,7 @@ func TestEngineInstance_CallDrpc(t *testing.T) { trc := engine.TestRunnerConfig{} trc.Running.Store(!tc.notStarted) runner := engine.NewTestRunner(&trc, engine.MockConfig()) - instance := NewEngineInstance(log, nil, nil, runner) + instance := NewEngineInstance(log, nil, nil, runner, nil) instance.ready.Store(!tc.notReady) if !tc.noSocket { @@ -190,7 +190,7 @@ func TestEngineInstance_CallDrpc_Parallel(t *testing.T) { trc := engine.TestRunnerConfig{} trc.Running.Store(true) runner := engine.NewTestRunner(&trc, engine.MockConfig()) - instance := NewEngineInstance(log, nil, nil, runner) + instance := NewEngineInstance(log, nil, nil, runner, nil) instance.ready.Store(true) instance.getDrpcClientFn = func(s string) drpc.DomainSocketClient { diff --git a/src/control/server/instance_exec_test.go b/src/control/server/instance_exec_test.go index a65f7437b83..9830dcb18b0 100644 --- a/src/control/server/instance_exec_test.go +++ b/src/control/server/instance_exec_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2021-2022 Intel Corporation. +// (C) Copyright 2021-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -67,7 +67,7 @@ func TestIOEngineInstance_exit(t *testing.T) { runner := engine.NewTestRunner(tc.trc, &engine.Config{}) - engine := NewEngineInstance(log, nil, nil, runner) + engine := NewEngineInstance(log, nil, nil, runner, nil) engine.setIndex(tc.instanceIdx) if tc.rankInSuperblock { diff --git a/src/control/server/instance_storage.go b/src/control/server/instance_storage.go index 71dc0ca181e..d92e392f706 100644 --- a/src/control/server/instance_storage.go +++ b/src/control/server/instance_storage.go @@ -15,10 +15,8 @@ import ( "github.com/pkg/errors" "github.com/daos-stack/daos/src/control/build" - "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/fault" "github.com/daos-stack/daos/src/control/fault/code" - "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/server/storage" ) @@ -76,19 +74,6 @@ func (ei *EngineInstance) NotifyStorageReady() { }() } -// createPublishFormatRequiredFunc returns onAwaitFormatFn which will publish an -// event using the provided publish function to indicate that host is awaiting -// storage format. -func createPublishFormatRequiredFunc(publish func(*events.RASEvent), hostname string) onAwaitFormatFn { - return func(_ context.Context, engineIdx uint32, formatType string) error { - evt := events.NewEngineFormatRequiredEvent(hostname, engineIdx, formatType). - WithRank(uint32(ranklist.NilRank)) - publish(evt) - - return nil - } -} - func (ei *EngineInstance) checkScmNeedFormat() (bool, error) { msgIdx := fmt.Sprintf("instance %d", ei.Index()) diff --git a/src/control/server/instance_storage_rpc.go b/src/control/server/instance_storage_rpc.go index 792850d4b7a..fbca1e534a0 100644 --- a/src/control/server/instance_storage_rpc.go +++ b/src/control/server/instance_storage_rpc.go @@ -13,12 +13,14 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" "github.com/pkg/errors" "golang.org/x/sys/unix" "github.com/daos-stack/daos/src/control/build" "github.com/daos-stack/daos/src/control/common/proto" ctlpb "github.com/daos-stack/daos/src/control/common/proto/ctl" + "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/fault" "github.com/daos-stack/daos/src/control/lib/hardware" "github.com/daos-stack/daos/src/control/lib/hardware/hwprov" @@ -185,6 +187,62 @@ func addLinkInfoToHealthStats(prov hardware.PCIeLinkStatsProvider, pciCfg string return nil } +// Only raise events if speed or width state is: +// - Currently at maximum but was previously downgraded +// - Currently downgraded but is now at maximum +// - Currently downgraded and was previously at a different downgraded speed +func checkPublishEvent(engine Engine, id events.RASID, typ, pciAddr, maximum, negotiated, lastMax, lastNeg string, port uint32) { + + // Return early if previous and current stats are both in the expected state. + if lastNeg == lastMax && negotiated == maximum { + return + } + + // Return if stats have not changed since when last seen. + if negotiated == lastNeg && maximum == lastMax { + return + } + + // Otherwise publish event indicating link state change. + + engine.Debugf("link %s changed on %s, was %s (max %s) now %s (max %s)", + typ, pciAddr, lastNeg, lastMax, negotiated, maximum) + msg := fmt.Sprintf("NVMe PCIe device at %q port-%d: link %s changed to %s "+ + "(max %s)", pciAddr, port, typ, negotiated, maximum) + + sev := events.RASSeverityWarning + if negotiated == maximum { + sev = events.RASSeverityNotice + } + + engine.Publish(events.NewGenericEvent(id, sev, msg, "")) +} + +// Evaluate PCIe link state on NVMe SSD and raise events when negotiated speed or width changes in +// relation to last recorded stats for the given PCI address. +func publishLinkStatEvents(engine Engine, pciAddr string, stats *ctlpb.BioHealthResp) { + lastStats := engine.GetLastHealthStats(pciAddr) + engine.SetLastHealthStats(pciAddr, stats) + + lastMaxSpeedStr, lastSpeedStr, lastMaxWidthStr, lastWidthStr := "-", "-", "-", "-" + if lastStats != nil { + lastMaxSpeedStr = humanize.SI(float64(lastStats.LinkMaxSpeed), "T/s") + lastSpeedStr = humanize.SI(float64(lastStats.LinkNegSpeed), "T/s") + lastMaxWidthStr = fmt.Sprintf("x%d", lastStats.LinkMaxWidth) + lastWidthStr = fmt.Sprintf("x%d", lastStats.LinkNegWidth) + } + + checkPublishEvent(engine, events.RASNVMeLinkSpeedChanged, "speed", pciAddr, + humanize.SI(float64(stats.LinkMaxSpeed), "T/s"), + humanize.SI(float64(stats.LinkNegSpeed), "T/s"), + lastMaxSpeedStr, lastSpeedStr, stats.LinkPortId) + + checkPublishEvent(engine, events.RASNVMeLinkWidthChanged, "width", pciAddr, + fmt.Sprintf("x%d", stats.LinkMaxWidth), + fmt.Sprintf("x%d", stats.LinkNegWidth), + lastMaxWidthStr, lastWidthStr, stats.LinkPortId) +} + func populateCtrlrHealth(ctx context.Context, engine Engine, req *ctlpb.BioHealthReq, ctrlr *ctlpb.NvmeController, prov hardware.PCIeLinkStatsProvider) (bool, error) { stateName := ctlpb.NvmeDevState_name[int32(ctrlr.DevState)] if !ctrlr.CanSupplyHealthStats() { @@ -203,6 +261,7 @@ func populateCtrlrHealth(ctx context.Context, engine Engine, req *ctlpb.BioHealt if err := addLinkInfoToHealthStats(prov, ctrlr.PciCfg, health); err != nil { return false, errors.Wrapf(err, "add link stats for %q", ctrlr) } + publishLinkStatEvents(engine, ctrlr.PciAddr, health) } else { engine.Debugf("no pcie config space received for %q, skip add link stats", ctrlr) } diff --git a/src/control/server/instance_storage_rpc_test.go b/src/control/server/instance_storage_rpc_test.go index 35a7e3f37f2..b199adb6b8d 100644 --- a/src/control/server/instance_storage_rpc_test.go +++ b/src/control/server/instance_storage_rpc_test.go @@ -8,14 +8,18 @@ package server import ( "context" + "sort" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" + "google.golang.org/protobuf/testing/protocmp" "github.com/daos-stack/daos/src/control/common/proto" ctlpb "github.com/daos-stack/daos/src/control/common/proto/ctl" "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/lib/hardware" "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/logging" @@ -40,90 +44,350 @@ func (mp *mockPCIeLinkStatsProvider) PCIeCapsFromConfig(cfgBytes []byte, dev *ha } func TestIOEngineInstance_populateCtrlrHealth(t *testing.T) { - healthWithoutLinkStats := func() *ctlpb.BioHealthResp { + healthWithLinkStats := func(maxSpd, spd float32, maxWdt, wdt uint32) *ctlpb.BioHealthResp { bhr := proto.MockNvmeHealth() - bhr.LinkPortId = 0 - bhr.LinkMaxSpeed = 0 - bhr.LinkNegSpeed = 0 - bhr.LinkMaxWidth = 0 - bhr.LinkNegWidth = 0 + bhr.LinkMaxSpeed = maxSpd + bhr.LinkNegSpeed = spd + bhr.LinkMaxWidth = maxWdt + bhr.LinkNegWidth = wdt return bhr } + pciAddr := test.MockPCIAddr(1) + lastStatsMap := func(bhr *ctlpb.BioHealthResp) map[string]*ctlpb.BioHealthResp { + return map[string]*ctlpb.BioHealthResp{pciAddr: bhr} + } for name, tc := range map[string]struct { - devState ctlpb.NvmeDevState - pciCfgSpc string - pciDev *hardware.PCIDevice - pciDevErr error - healthRes *ctlpb.BioHealthResp - healthErr error - expCtrlr *ctlpb.NvmeController - expUpdated bool - expErr error + badDevState bool + noPciCfgSpc bool + pciDev *hardware.PCIDevice + pciDevErr error + emptyHealthRes bool + healthErr error + lastStats map[string]*ctlpb.BioHealthResp + expCtrlr *ctlpb.NvmeController + expNotUpdated bool + expErr error + expDispatched []*events.RASEvent + expLastStats map[string]*ctlpb.BioHealthResp }{ "bad state; skip health": { - healthRes: healthWithoutLinkStats(), - expCtrlr: &ctlpb.NvmeController{}, + badDevState: true, + noPciCfgSpc: true, + expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, + }, + expNotUpdated: true, }, "update health; add link stats skipped as empty pci config space": { - devState: ctlpb.NvmeDevState_NORMAL, - pciDev: &hardware.PCIDevice{ - LinkNegSpeed: 8e+9, + noPciCfgSpc: true, + expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(0, 0, 0, 0), }, - healthRes: healthWithoutLinkStats(), + }, + "empty bio health response; empty link stats": { + emptyHealthRes: true, + pciDev: new(hardware.PCIDevice), expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, DevState: ctlpb.NvmeDevState_NORMAL, - HealthStats: healthWithoutLinkStats(), + HealthStats: new(ctlpb.BioHealthResp), }, - expUpdated: true, + expLastStats: lastStatsMap(new(ctlpb.BioHealthResp)), + }, + "error retrieving bio health response": { + healthErr: errors.New("fail"), + expErr: errors.New("fail"), }, "update health; add link stats; pciutils lib error": { - devState: ctlpb.NvmeDevState_NORMAL, - pciCfgSpc: "ABCD", - pciDev: &hardware.PCIDevice{ - LinkNegSpeed: 8e+9, - }, pciDevErr: errors.New("fail"), expErr: errors.New("fail"), }, - "update health; add link stats": { - devState: ctlpb.NvmeDevState_NORMAL, - pciCfgSpc: "ABCD", + "update health; add link stats; normal link state; no event published": { + expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: proto.MockNvmeHealth(), + }, + expLastStats: lastStatsMap(proto.MockNvmeHealth()), + }, + "update health; add link stats; speed downgraded; no last stats": { pciDev: &hardware.PCIDevice{ LinkPortID: 1, + LinkMaxSpeed: 2.5e+9, LinkNegSpeed: 1e+9, + LinkMaxWidth: 4, + LinkNegWidth: 4, + }, + // Stats only exist for different PCI address. + lastStats: map[string]*ctlpb.BioHealthResp{ + test.MockPCIAddr(2): proto.MockNvmeHealth(), + }, + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(2.5e+9, 1e+9, 4, 4), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkSpeedChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link speed changed to "+ + "1 GT/s (max 2.5 GT/s)", ""), + }, + expLastStats: map[string]*ctlpb.BioHealthResp{ + pciAddr: healthWithLinkStats(2.5e+9, 1e+9, 4, 4), + test.MockPCIAddr(2): proto.MockNvmeHealth(), + }, + }, + "update health; add link stats; width downgraded; no last stats": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, LinkMaxSpeed: 1e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 8, LinkNegWidth: 4, - LinkMaxWidth: 4, }, - healthRes: healthWithoutLinkStats(), expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(1e+9, 1e+9, 8, 4), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkWidthChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link width changed to "+ + "x4 (max x8)", ""), + }, + expLastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 8, 4)), + }, + "update health; add link stats; link state normal; identical last stats; no event": { + lastStats: lastStatsMap(proto.MockNvmeHealth()), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: proto.MockNvmeHealth(), + }, + expLastStats: lastStatsMap(proto.MockNvmeHealth()), + }, + "update health; add link stats; link state normal; speed degraded in last stats": { + lastStats: lastStatsMap(healthWithLinkStats(1e+9, 0.5e+9, 4, 4)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: proto.MockNvmeHealth(), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkSpeedChanged, + events.RASSeverityNotice, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link speed changed to "+ + "1 GT/s (max 1 GT/s)", ""), + }, + expLastStats: lastStatsMap(proto.MockNvmeHealth()), + }, + "update health; add link stats; link state normal; width degraded in last stats": { + lastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 4, 1)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: pciAddr, DevState: ctlpb.NvmeDevState_NORMAL, HealthStats: proto.MockNvmeHealth(), }, - expUpdated: true, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkWidthChanged, + events.RASSeverityNotice, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link width changed to "+ + "x4 (max x4)", ""), + }, + expLastStats: lastStatsMap(proto.MockNvmeHealth()), + }, + "update health; add link stats; speed degraded; speed degraded in last stats; no event": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 2.5e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 4, + LinkNegWidth: 4, + }, + lastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 1e+9, 4, 4)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(2.5e+9, 1e+9, 4, 4), + }, + expLastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 1e+9, 4, 4)), + }, + "update health; add link stats; width degraded; width degraded in last stats; no event": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 1e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 8, + LinkNegWidth: 4, + }, + lastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 8, 4)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(1e+9, 1e+9, 8, 4), + }, + expLastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 8, 4)), + }, + "update health; add link stats; speed degraded; width degraded in last stats": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 2.5e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 8, + LinkNegWidth: 8, + }, + lastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 2.5e+9, 8, 4)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(2.5e+9, 1e+9, 8, 8), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkSpeedChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link speed changed to "+ + "1 GT/s (max 2.5 GT/s)", ""), + events.NewGenericEvent(events.RASNVMeLinkWidthChanged, + events.RASSeverityNotice, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link width changed to "+ + "x8 (max x8)", ""), + }, + expLastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 1e+9, 8, 8)), + }, + "update health; add link stats; width degraded; speed degraded in last stats": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 2.5e+9, + LinkNegSpeed: 2.5e+9, + LinkMaxWidth: 8, + LinkNegWidth: 4, + }, + lastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 1e+9, 8, 8)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(2.5e+9, 2.5e+9, 8, 4), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkSpeedChanged, + events.RASSeverityNotice, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link speed changed to "+ + "2.5 GT/s (max 2.5 GT/s)", ""), + events.NewGenericEvent(events.RASNVMeLinkWidthChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link width changed to "+ + "x4 (max x8)", ""), + }, + expLastStats: lastStatsMap(healthWithLinkStats(2.5e+9, 2.5e+9, 8, 4)), + }, + "update health; add link stats; speed degraded; speed diff degraded in last stats": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 8e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 4, + LinkNegWidth: 4, + }, + lastStats: lastStatsMap(healthWithLinkStats(8e+9, 2.5e+9, 4, 4)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(8e+9, 1e+9, 4, 4), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkSpeedChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link speed changed to "+ + "1 GT/s (max 8 GT/s)", ""), + }, + expLastStats: lastStatsMap(healthWithLinkStats(8e+9, 1e+9, 4, 4)), + }, + "update health; add link stats; width degraded; width diff degraded in last stats": { + pciDev: &hardware.PCIDevice{ + LinkPortID: 1, + LinkMaxSpeed: 1e+9, + LinkNegSpeed: 1e+9, + LinkMaxWidth: 16, + LinkNegWidth: 4, + }, + lastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 16, 8)), + expCtrlr: &ctlpb.NvmeController{ + PciAddr: test.MockPCIAddr(1), + DevState: ctlpb.NvmeDevState_NORMAL, + HealthStats: healthWithLinkStats(1e+9, 1e+9, 16, 4), + }, + expDispatched: []*events.RASEvent{ + events.NewGenericEvent(events.RASNVMeLinkWidthChanged, + events.RASSeverityWarning, "NVMe PCIe device at "+ + "\"0000:01:00.0\" port-1: link width changed to "+ + "x4 (max x16)", ""), + }, + expLastStats: lastStatsMap(healthWithLinkStats(1e+9, 1e+9, 16, 4)), }, } { t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + healthRes := healthWithLinkStats(0, 0, 0, 0) + if tc.emptyHealthRes { + healthRes = new(ctlpb.BioHealthResp) + } getCtrlrHealth = func(_ context.Context, _ Engine, _ *ctlpb.BioHealthReq) (*ctlpb.BioHealthResp, error) { - return tc.healthRes, tc.healthErr + return healthRes, tc.healthErr } defer func() { getCtrlrHealth = getBioHealth }() + var devState ctlpb.NvmeDevState + if !tc.badDevState { + devState = ctlpb.NvmeDevState_NORMAL + } + var pciCfgSpc string + if !tc.noPciCfgSpc { + pciCfgSpc = "ABCD" + } + if tc.pciDev == nil { + tc.pciDev = &hardware.PCIDevice{ + LinkPortID: 1, + LinkNegSpeed: 1e+9, + LinkMaxSpeed: 1e+9, + LinkNegWidth: 4, + LinkMaxWidth: 4, + } + } + mockProv := &mockPCIeLinkStatsProvider{ pciDev: tc.pciDev, pciDevErr: tc.pciDevErr, } ctrlr := &ctlpb.NvmeController{ - PciCfg: tc.pciCfgSpc, - DevState: tc.devState, + PciAddr: pciAddr, + PciCfg: pciCfgSpc, + DevState: devState, } - upd, err := populateCtrlrHealth(test.Context(t), NewMockInstance(nil), + ctx, cancel := context.WithTimeout(test.Context(t), 200*time.Millisecond) + defer cancel() + + ps := events.NewPubSub(ctx, log) + defer ps.Close() + + ei := NewEngineInstance(log, nil, nil, nil, ps) + ei._lastHealthStats = tc.lastStats + + subscriber := newMockSubscriber(2) + ps.Subscribe(events.RASTypeInfoOnly, subscriber) + + upd, err := populateCtrlrHealth(test.Context(t), ei, &ctlpb.BioHealthReq{}, ctrlr, mockProv) test.CmpErr(t, tc.expErr, err) if err != nil { @@ -134,7 +398,25 @@ func TestIOEngineInstance_populateCtrlrHealth(t *testing.T) { defStorageScanCmpOpts...); diff != "" { t.Fatalf("unexpected controller output (-want, +got):\n%s\n", diff) } - test.AssertEqual(t, tc.expUpdated, upd, "") + test.AssertEqual(t, !tc.expNotUpdated, upd, "") + + <-ctx.Done() + + if diff := cmp.Diff(tc.expLastStats, ei._lastHealthStats, protocmp.Transform()); diff != "" { + t.Fatalf("unexpected last health stats (-want, +got)\n%s\n", diff) + } + + // Compare events received with expected, sort received first. + dispatched := subscriber.getRx() + sort.Strings(dispatched) + var expEvtStrs []string + for _, e := range tc.expDispatched { + e.Timestamp = "" // Remove TS before comparing. + expEvtStrs = append(expEvtStrs, e.String()) + } + if diff := cmp.Diff(expEvtStrs, dispatched, defEvtCmpOpts...); diff != "" { + t.Fatalf("unexpected events dispatched (-want, +got)\n%s\n", diff) + } }) } } diff --git a/src/control/server/instance_storage_test.go b/src/control/server/instance_storage_test.go index 3685ae14d8c..3b4ebf82ae0 100644 --- a/src/control/server/instance_storage_test.go +++ b/src/control/server/instance_storage_test.go @@ -98,7 +98,7 @@ func TestIOEngineInstance_MountControlMetadata(t *testing.T) { sysProv := system.NewMockSysProvider(log, tc.sysCfg) provider := storage.MockProvider(log, 0, &mockRamCfg, sysProv, nil, nil, tc.meta) - instance := NewEngineInstance(log, provider, nil, runner) + instance := NewEngineInstance(log, provider, nil, runner, nil) gotErr := instance.MountMetadata() test.CmpErr(t, tc.expErr, gotErr) @@ -204,7 +204,7 @@ func TestIOEngineInstance_MountScmDevice(t *testing.T) { sys := system.NewMockSysProvider(log, tc.msCfg) scm := scm.NewMockProvider(log, nil, tc.msCfg) provider := storage.MockProvider(log, 0, tc.cfg, sys, scm, nil, nil) - instance := NewEngineInstance(log, provider, nil, runner) + instance := NewEngineInstance(log, provider, nil, runner, nil) gotErr := instance.MountScm() test.CmpErr(t, tc.expErr, gotErr) @@ -325,7 +325,7 @@ func TestEngineInstance_NeedsScmFormat(t *testing.T) { system.NewMockSysProvider(log, tc.msCfg), scm.NewMockProvider(log, tc.mbCfg, tc.msCfg), nil, nil) - instance := NewEngineInstance(log, mp, nil, runner) + instance := NewEngineInstance(log, mp, nil, runner, nil) gotNeedsFormat, gotErr := instance.GetStorage().ScmNeedsFormat() test.CmpErr(t, tc.expErr, gotErr) @@ -520,7 +520,7 @@ func TestIOEngineInstance_awaitStorageReady(t *testing.T) { system.NewMockSysProvider(log, &msc), scm.NewMockProvider(log, &smbc, &msc), nil, mmp) - engine := NewEngineInstance(log, mp, nil, runner) + engine := NewEngineInstance(log, mp, nil, runner, nil) engine.setIndex(tc.engineIndex) diff --git a/src/control/server/instance_superblock_test.go b/src/control/server/instance_superblock_test.go index 4dee3eceefb..04ce8079f77 100644 --- a/src/control/server/instance_superblock_test.go +++ b/src/control/server/instance_superblock_test.go @@ -50,7 +50,7 @@ func TestServer_Instance_createSuperblock(t *testing.T) { mp := storage.NewProvider(log, 0, &cfg.Storage, sysprov.NewMockSysProvider(log, msc), scm.NewMockProvider(log, mbc, msc), nil, nil) - ei := NewEngineInstance(log, mp, nil, r). + ei := NewEngineInstance(log, mp, nil, r, nil). WithHostFaultDomain(system.MustCreateFaultDomainFromString("/host1")) ei.fsRoot = testDir if err := h.AddInstance(ei); err != nil { diff --git a/src/control/server/instance_test.go b/src/control/server/instance_test.go index d4eb86cce75..2d15fd7269b 100644 --- a/src/control/server/instance_test.go +++ b/src/control/server/instance_test.go @@ -22,6 +22,7 @@ import ( srvpb "github.com/daos-stack/daos/src/control/common/proto/srv" "github.com/daos-stack/daos/src/control/common/test" "github.com/daos-stack/daos/src/control/drpc" + "github.com/daos-stack/daos/src/control/events" "github.com/daos-stack/daos/src/control/lib/atm" "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/logging" @@ -46,7 +47,7 @@ func getTestEngineInstance(log logging.Logger) *EngineInstance { ) runner := engine.NewRunner(log, cfg) storage := storage.MockProvider(log, 0, &cfg.Storage, nil, nil, nil, nil) - return NewEngineInstance(log, storage, nil, runner) + return NewEngineInstance(log, storage, nil, runner, nil) } func TestServer_Instance_WithHostFaultDomain(t *testing.T) { @@ -128,7 +129,7 @@ func TestServer_Instance_updateFaultDomainInSuperblock(t *testing.T) { storage := storage.MockProvider(log, 0, &cfg.Storage, sysProv, scmProv, nil, nil) - ei := NewEngineInstance(log, storage, nil, runner). + ei := NewEngineInstance(log, storage, nil, runner, nil). WithHostFaultDomain(tc.newDomain) ei.fsRoot = testDir ei._superblock = tc.superblock @@ -184,6 +185,7 @@ type ( StopErr error ScmTierConfig *storage.TierConfig ScanBdevTiersResult []storage.BdevTierScanResult + LastHealthStats map[string]*ctlpb.BioHealthResp } MockInstance struct { @@ -309,3 +311,15 @@ func (mi *MockInstance) Debugf(format string, args ...interface{}) { func (mi *MockInstance) Tracef(format string, args ...interface{}) { return } + +func (mi *MockInstance) Publish(event *events.RASEvent) { + return +} + +func (mi *MockInstance) GetLastHealthStats(pciAddr string) *ctlpb.BioHealthResp { + return mi.cfg.LastHealthStats[pciAddr] +} + +func (mi *MockInstance) SetLastHealthStats(pciAddr string, bhr *ctlpb.BioHealthResp) { + mi.cfg.LastHealthStats[pciAddr] = bhr +} diff --git a/src/control/server/mgmt_pool_test.go b/src/control/server/mgmt_pool_test.go index f0e7d85f4bd..85d29d9454f 100644 --- a/src/control/server/mgmt_pool_test.go +++ b/src/control/server/mgmt_pool_test.go @@ -569,7 +569,7 @@ func TestServer_MgmtSvc_PoolCreate(t *testing.T) { mp := storage.NewProvider(log, 0, &engineCfg.Storage, nil, nil, nil, nil) - srv := NewEngineInstance(log, mp, nil, r) + srv := NewEngineInstance(log, mp, nil, r, nil) srv.setDrpcSocket("/dontcare") srv.ready.SetTrue() diff --git a/src/control/server/mgmt_system.go b/src/control/server/mgmt_system.go index 5dcea6991a0..e85091b5529 100644 --- a/src/control/server/mgmt_system.go +++ b/src/control/server/mgmt_system.go @@ -391,9 +391,9 @@ func (svc *mgmtSvc) updateFabricProviders(provList []string, publisher events.Pu return nil } -func newFabricProvChangedEvent(old, new string) *events.RASEvent { +func newFabricProvChangedEvent(o, n string) *events.RASEvent { return events.NewGenericEvent(events.RASSystemFabricProvChanged, events.RASSeverityNotice, - fmt.Sprintf("system fabric provider has changed: %s -> %s", old, new), "") + fmt.Sprintf("system fabric provider has changed: %s -> %s", o, n), "") } // reqGroupUpdate requests a group update. diff --git a/src/control/server/mgmt_system_test.go b/src/control/server/mgmt_system_test.go index 7a7d6ec0d1e..f482b1943a1 100644 --- a/src/control/server/mgmt_system_test.go +++ b/src/control/server/mgmt_system_test.go @@ -297,16 +297,6 @@ func TestServer_MgmtSvc_LeaderQuery(t *testing.T) { } } -type eventsDispatched struct { - rx []*events.RASEvent - cancel context.CancelFunc -} - -func (d *eventsDispatched) OnEvent(ctx context.Context, e *events.RASEvent) { - d.rx = append(d.rx, e) - d.cancel() -} - func TestServer_MgmtSvc_ClusterEvent(t *testing.T) { eventEngineDied := mockEvtEngineDied(t) @@ -315,7 +305,7 @@ func TestServer_MgmtSvc_ClusterEvent(t *testing.T) { zeroSeq bool event *events.RASEvent expResp *sharedpb.ClusterEventResp - expDispatched []*events.RASEvent + expDispatched []string expErr error }{ "nil request": { @@ -327,8 +317,12 @@ func TestServer_MgmtSvc_ClusterEvent(t *testing.T) { expResp: &sharedpb.ClusterEventResp{ Sequence: 1, }, - expDispatched: []*events.RASEvent{ - eventEngineDied.WithForwarded(true), + expDispatched: []string{ + func() string { + e := eventEngineDied.WithForwarded(true) + e.Timestamp = "" + return e.String() + }(), }, }, } { @@ -342,10 +336,12 @@ func TestServer_MgmtSvc_ClusterEvent(t *testing.T) { defer cancel() ps := events.NewPubSub(ctx, log) + defer ps.Close() + svc.events = ps - dispatched := &eventsDispatched{cancel: cancel} - svc.events.Subscribe(events.RASTypeStateChange, dispatched) + subscriber := newMockSubscriber(1) + svc.events.Subscribe(events.RASTypeStateChange, subscriber) var pbReq *sharedpb.ClusterEventReq switch { @@ -377,7 +373,7 @@ func TestServer_MgmtSvc_ClusterEvent(t *testing.T) { t.Fatalf("unexpected response (-want, +got)\n%s\n", diff) } - if diff := cmp.Diff(tc.expDispatched, dispatched.rx, defEvtCmpOpts...); diff != "" { + if diff := cmp.Diff(tc.expDispatched, subscriber.getRx(), defEvtCmpOpts...); diff != "" { t.Fatalf("unexpected events dispatched (-want, +got)\n%s\n", diff) } }) @@ -1261,8 +1257,8 @@ func TestServer_MgmtSvc_SystemQuery(t *testing.T) { defer ps.Close() svc.events = ps - dispatched := &eventsDispatched{cancel: cancel} - svc.events.Subscribe(events.RASTypeStateChange, dispatched) + subscriber := newMockSubscriber(1) + svc.events.Subscribe(events.RASTypeStateChange, subscriber) if !tc.emptyDb { for _, m := range defaultMembers { @@ -1309,8 +1305,10 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { Message: &mgmtpb.SystemStartResp{Results: rrs}, } } - expEventsStartFail := []*events.RASEvent{ - newSystemStartFailedEvent("failed rank 0"), + expEventsStartFail := func(msgErr string) []string { + e := newSystemStartFailedEvent(msgErr) + e.Timestamp = "" + return []string{e.String()} } for name, tc := range map[string]struct { @@ -1322,7 +1320,7 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { expAbsentRanks string expAbsentHosts string expAPIErr error - expDispatched []*events.RASEvent + expDispatched []string }{ "nil req": { req: (*mgmtpb.SystemStartReq)(nil), @@ -1358,7 +1356,7 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { mockMember(t, 2, 2, "ready"), mockMember(t, 3, 2, "ready"), }, - expDispatched: expEventsStartFail, + expDispatched: expEventsStartFail("failed rank 0"), }, "filtered and oversubscribed ranks": { req: &mgmtpb.SystemStartReq{Ranks: "0-1,4-9"}, @@ -1382,7 +1380,7 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { mockMember(t, 3, 2, "stopped"), }, expAbsentRanks: "4-9", - expDispatched: expEventsStartFail, + expDispatched: expEventsStartFail("failed rank 0"), }, "filtered and oversubscribed hosts": { req: &mgmtpb.SystemStartReq{Hosts: "10.0.0.[2-5]"}, @@ -1406,9 +1404,7 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { mockMember(t, 3, 2, "ready"), }, expAbsentHosts: "10.0.0.[3-5]", - expDispatched: []*events.RASEvent{ - newSystemStartFailedEvent("failed rank 2"), - }, + expDispatched: expEventsStartFail("failed rank 2"), }, "filtered hosts": { req: &mgmtpb.SystemStartReq{Hosts: "10.0.0.[1-2]"}, @@ -1448,8 +1444,8 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { ps := events.NewPubSub(ctx, log) svc.events = ps - dispatched := &eventsDispatched{cancel: cancel} - svc.events.Subscribe(events.RASTypeInfoOnly, dispatched) + subscriber := newMockSubscriber(1) + svc.events.Subscribe(events.RASTypeInfoOnly, subscriber) if tc.req != nil && tc.req.Sys == "" { tc.req.Sys = build.DefaultSystemName @@ -1467,7 +1463,7 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { <-ctx.Done() - if diff := cmp.Diff(tc.expDispatched, dispatched.rx, defEvtCmpOpts...); diff != "" { + if diff := cmp.Diff(tc.expDispatched, subscriber.getRx(), defEvtCmpOpts...); diff != "" { t.Fatalf("unexpected events dispatched (-want, +got)\n%s\n", diff) } }) @@ -1475,17 +1471,14 @@ func TestServer_MgmtSvc_SystemStart(t *testing.T) { } func TestServer_MgmtSvc_SystemStop(t *testing.T) { - defaultMembers := system.Members{ - mockMember(t, 0, 1, "joined"), - mockMember(t, 1, 1, "joined"), - mockMember(t, 3, 2, "joined"), - } - emf := func(a string) system.Members { - return system.Members{ - // updated to err on prep fail if not forced - mockMember(t, 0, 1, "errored").WithInfo(a + " failed"), - mockMember(t, 1, 1, act2state(a)), - mockMember(t, 3, 2, "errored").WithInfo(a + " failed"), + emf := func(a string) func() system.Members { + return func() system.Members { + return system.Members{ + // updated to err on prep fail if not forced + mockMember(t, 0, 1, "errored").WithInfo(a + " failed"), + mockMember(t, 1, 1, act2state(a)), + mockMember(t, 3, 2, "errored").WithInfo(a + " failed"), + } } } expMembersPrepFail := emf("prep shutdown") @@ -1526,23 +1519,21 @@ func TestServer_MgmtSvc_SystemStop(t *testing.T) { rankResStopSuccess := []*sharedpb.RankResult{ mockRankSuccess("stop", 0, 1), mockRankSuccess("stop", 1, 1), mockRankSuccess("stop", 3, 2), } - expEventsPrepFail := []*events.RASEvent{ - newSystemStopFailedEvent("prep shutdown", "failed ranks 0,3"), - } - expEventsStopFail := []*events.RASEvent{ - newSystemStopFailedEvent("stop", "failed ranks 0,3"), + expEventsStopFail := func(msgErr string) []string { + e := newSystemStopFailedEvent(msgErr, "failed ranks 0,3") + e.Timestamp = "" + return []string{e.String()} } for name, tc := range map[string]struct { req *mgmtpb.SystemStopReq - members system.Members mResps [][]*control.HostResponse - expMembers system.Members + expMembers func() system.Members expResults []*sharedpb.RankResult expAbsentRanks string expAbsentHosts string expAPIErr error - expDispatched []*events.RASEvent + expDispatched []string expInvokeCount int }{ "nil req": { @@ -1557,67 +1548,69 @@ func TestServer_MgmtSvc_SystemStop(t *testing.T) { }, "prep fail": { req: &mgmtpb.SystemStopReq{}, - members: defaultMembers, mResps: hostRespFail, expResults: rankResPrepFail, expMembers: expMembersPrepFail, - expDispatched: expEventsPrepFail, + expDispatched: expEventsStopFail("prep shutdown"), expInvokeCount: 1, }, "prep success stop fail": { req: &mgmtpb.SystemStopReq{}, - members: defaultMembers, mResps: hostRespStopFail, expResults: rankResStopFail, expMembers: expMembersStopFail, - expDispatched: expEventsStopFail, + expDispatched: expEventsStopFail("stop"), expInvokeCount: 2, }, "stop some ranks": { req: &mgmtpb.SystemStopReq{Ranks: "0,1"}, - members: defaultMembers, mResps: [][]*control.HostResponse{{hr(1, mockRankSuccess("stop", 0), mockRankSuccess("stop", 1))}}, expResults: []*sharedpb.RankResult{mockRankSuccess("stop", 0, 1), mockRankSuccess("stop", 1, 1)}, - expMembers: system.Members{ - mockMember(t, 0, 1, "stopped"), - mockMember(t, 1, 1, "stopped"), - mockMember(t, 3, 2, "joined"), + expMembers: func() system.Members { + return system.Members{ + mockMember(t, 0, 1, "stopped"), + mockMember(t, 1, 1, "stopped"), + mockMember(t, 3, 2, "joined"), + } }, expInvokeCount: 1, // prep should not be called }, "stop with all ranks (same as full system stop)": { req: &mgmtpb.SystemStopReq{Ranks: "0,1,3"}, - members: defaultMembers, mResps: hostRespSuccess, expResults: rankResStopSuccess, - expMembers: system.Members{ - mockMember(t, 0, 1, "stopped"), - mockMember(t, 1, 1, "stopped"), - mockMember(t, 3, 2, "stopped"), + expMembers: func() system.Members { + return system.Members{ + mockMember(t, 0, 1, "stopped"), + mockMember(t, 1, 1, "stopped"), + mockMember(t, 3, 2, "stopped"), + } }, expInvokeCount: 2, // prep should be called }, "full system stop": { req: &mgmtpb.SystemStopReq{}, - members: defaultMembers, mResps: hostRespSuccess, expResults: rankResStopSuccess, - expMembers: system.Members{ - mockMember(t, 0, 1, "stopped"), - mockMember(t, 1, 1, "stopped"), - mockMember(t, 3, 2, "stopped"), + expMembers: func() system.Members { + return system.Members{ + mockMember(t, 0, 1, "stopped"), + mockMember(t, 1, 1, "stopped"), + mockMember(t, 3, 2, "stopped"), + } }, expInvokeCount: 2, // prep should be called }, "full system stop (forced)": { req: &mgmtpb.SystemStopReq{Force: true}, - members: defaultMembers, mResps: hostRespStopSuccess, expResults: rankResStopSuccess, - expMembers: system.Members{ - mockMember(t, 0, 1, "stopped"), - mockMember(t, 1, 1, "stopped"), - mockMember(t, 3, 2, "stopped"), + expMembers: func() system.Members { + return system.Members{ + mockMember(t, 0, 1, "stopped"), + mockMember(t, 1, 1, "stopped"), + mockMember(t, 3, 2, "stopped"), + } }, expInvokeCount: 1, // prep should not be called }, @@ -1629,7 +1622,12 @@ func TestServer_MgmtSvc_SystemStop(t *testing.T) { if tc.mResps == nil { tc.mResps = [][]*control.HostResponse{{}} } - svc := mgmtSystemTestSetup(t, log, tc.members, tc.mResps...) + members := system.Members{ + mockMember(t, 0, 1, "joined"), + mockMember(t, 1, 1, "joined"), + mockMember(t, 3, 2, "joined"), + } + svc := mgmtSystemTestSetup(t, log, members, tc.mResps...) ctx, cancel := context.WithTimeout(test.Context(t), 200*time.Millisecond) defer cancel() @@ -1637,8 +1635,8 @@ func TestServer_MgmtSvc_SystemStop(t *testing.T) { ps := events.NewPubSub(ctx, log) svc.events = ps - dispatched := &eventsDispatched{cancel: cancel} - svc.events.Subscribe(events.RASTypeInfoOnly, dispatched) + subscriber := newMockSubscriber(1) + svc.events.Subscribe(events.RASTypeInfoOnly, subscriber) if tc.req != nil && tc.req.Sys == "" { tc.req.Sys = build.DefaultSystemName @@ -1650,13 +1648,13 @@ func TestServer_MgmtSvc_SystemStop(t *testing.T) { } checkRankResults(t, tc.expResults, gotResp.Results) - checkMembers(t, tc.expMembers, svc.membership) + checkMembers(t, tc.expMembers(), svc.membership) test.AssertEqual(t, tc.expAbsentHosts, gotResp.Absenthosts, "absent hosts") test.AssertEqual(t, tc.expAbsentRanks, gotResp.Absentranks, "absent ranks") <-ctx.Done() - if diff := cmp.Diff(tc.expDispatched, dispatched.rx, defEvtCmpOpts...); diff != "" { + if diff := cmp.Diff(tc.expDispatched, subscriber.getRx(), defEvtCmpOpts...); diff != "" { t.Fatalf("unexpected events dispatched (-want, +got)\n%s\n", diff) } diff --git a/src/control/server/mocks.go b/src/control/server/mocks.go index e0c4e144afa..b3044c1326f 100644 --- a/src/control/server/mocks.go +++ b/src/control/server/mocks.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2019-2023 Intel Corporation. +// (C) Copyright 2019-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -7,6 +7,9 @@ package server import ( + "context" + "sync" + "github.com/dustin/go-humanize" "github.com/daos-stack/daos/src/control/common" @@ -62,6 +65,38 @@ type mockPublisher struct { published []*events.RASEvent } -func (m *mockPublisher) Publish(e *events.RASEvent) { - m.published = append(m.published, e) +func (mp *mockPublisher) Publish(e *events.RASEvent) { + mp.published = append(mp.published, e) +} + +func newMockSubscriber(expCount int) *mockSubscriber { + return &mockSubscriber{ + expectedRx: expCount, + finished: make(chan struct{}), + } +} + +type mockSubscriber struct { + sync.Mutex + finished chan struct{} + expectedRx int + rx []string +} + +func (ms *mockSubscriber) OnEvent(_ context.Context, evt *events.RASEvent) { + ms.Lock() + defer ms.Unlock() + + evt.Timestamp = "" // Remove TS for event comparison in unittests + ms.rx = append(ms.rx, evt.String()) + if len(ms.rx) == ms.expectedRx { + close(ms.finished) + } +} + +func (ms *mockSubscriber) getRx() []string { + ms.Lock() + defer ms.Unlock() + + return ms.rx } diff --git a/src/control/server/server.go b/src/control/server/server.go index fd353171a7d..4ac8ce5e04b 100644 --- a/src/control/server/server.go +++ b/src/control/server/server.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2018-2023 Intel Corporation. +// (C) Copyright 2018-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -324,7 +324,7 @@ func (srv *server) createEngine(ctx context.Context, idx int, cfg *engine.Config sp := storage.DefaultProvider(srv.log, idx, &cfg.Storage). WithVMDEnabled(srv.ctlSvc.storage.IsVMDEnabled()) - engine := NewEngineInstance(srv.log, sp, joinFn, engine.NewRunner(srv.log, cfg)). + engine := NewEngineInstance(srv.log, sp, joinFn, engine.NewRunner(srv.log, cfg), srv.pubSub). WithHostFaultDomain(srv.harness.faultDomain) if idx == 0 { diff --git a/src/control/server/server_utils.go b/src/control/server/server_utils.go index 0fbf88740d4..269a5201e30 100644 --- a/src/control/server/server_utils.go +++ b/src/control/server/server_utils.go @@ -545,6 +545,19 @@ func checkEngineTmpfsMem(srv *server, ei *EngineInstance, mi *common.MemInfo) er return nil } +// createPublishFormatRequiredFunc returns onAwaitFormatFn which will publish an +// event using the provided publish function to indicate that host is awaiting +// storage format. +func createPublishFormatRequiredFunc(publish func(*events.RASEvent), hostname string) onAwaitFormatFn { + return func(_ context.Context, engineIdx uint32, formatType string) error { + evt := events.NewEngineFormatRequiredEvent(hostname, engineIdx, formatType). + WithRank(uint32(ranklist.NilRank)) + publish(evt) + + return nil + } +} + func registerEngineEventCallbacks(srv *server, engine *EngineInstance, allStarted *sync.WaitGroup) { // Register callback to publish engine process exit events. engine.OnInstanceExit(createPublishInstanceExitFunc(srv.pubSub.Publish, srv.hostname)) diff --git a/src/control/server/server_utils_test.go b/src/control/server/server_utils_test.go index 5a992e3a9fa..1c8eadb8856 100644 --- a/src/control/server/server_utils_test.go +++ b/src/control/server/server_utils_test.go @@ -725,7 +725,7 @@ func TestServer_prepBdevStorage(t *testing.T) { } runner := engine.NewRunner(log, srv.cfg.Engines[0]) - ei := NewEngineInstance(log, srv.ctlSvc.storage, nil, runner) + ei := NewEngineInstance(log, srv.ctlSvc.storage, nil, runner, nil) mi.HugepagesFree = tc.hugepagesFree @@ -829,7 +829,7 @@ func TestServer_checkEngineTmpfsMem(t *testing.T) { sysMock := sysprov.NewMockSysProvider(log, sysMockCfg) scmMock := &storage.MockScmProvider{} provider := storage.MockProvider(log, 0, &ec.Storage, sysMock, scmMock, nil, nil) - instance := NewEngineInstance(log, provider, nil, runner) + instance := NewEngineInstance(log, provider, nil, runner, nil) srv, err := newServer(log, cfg, &system.FaultDomain{}) if err != nil { diff --git a/src/control/server/util_test.go b/src/control/server/util_test.go index adcee6585d0..ac6b42ed51c 100644 --- a/src/control/server/util_test.go +++ b/src/control/server/util_test.go @@ -214,7 +214,7 @@ func newTestEngine(log logging.Logger, isAP bool, provider *storage.Provider, en rCfg.Running.SetTrue() r := engine.NewTestRunner(rCfg, engineCfg[0]) - e := NewEngineInstance(log, provider, nil, r) + e := NewEngineInstance(log, provider, nil, r, nil) e.setDrpcSocket("/dontcare") e.setSuperblock(&Superblock{ Rank: ranklist.NewRankPtr(0), diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index cc18a1ac915..1c0e73c9640 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1220,6 +1220,9 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che /* Handle the entries whose leaders are on current server. */ d_list_for_each_entry_safe(dsp, tmp, &self, dsp_link) { struct dtx_entry dte; + struct dtx_entry *pdte = &dte; + struct dtx_cos_key dck; + d_list_del(&dsp->dsp_link); @@ -1228,13 +1231,31 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che dte.dte_refs = 1; dte.dte_mbs = dsp->dsp_mbs; + if (for_io) { + rc = vos_dtx_check(cont->sc_hdl, &dsp->dsp_xid, NULL, NULL, NULL, false); + switch(rc) { + case DTX_ST_COMMITTABLE: + dck.oid = dsp->dsp_oid; + dck.dkey_hash = dsp->dsp_dkey_hash; + rc = dtx_commit(cont, &pdte, &dck, 1); + if (rc < 0 && rc != -DER_NONEXIST && for_io) + d_list_add_tail(&dsp->dsp_link, cmt_list); + else + dtx_dsp_free(dsp); + continue; + case DTX_ST_COMMITTED: + case -DER_NONEXIST: /* Aborted */ + dtx_dsp_free(dsp); + continue; + default: + break; + } + } + rc = dtx_status_handle_one(cont, &dte, dsp->dsp_oid, dsp->dsp_dkey_hash, dsp->dsp_epoch, NULL, NULL); switch (rc) { case DSHR_NEED_COMMIT: { - struct dtx_entry *pdte = &dte; - struct dtx_cos_key dck; - dck.oid = dsp->dsp_oid; dck.dkey_hash = dsp->dsp_dkey_hash; rc = dtx_commit(cont, &pdte, &dck, 1); diff --git a/src/include/daos_srv/ras.h b/src/include/daos_srv/ras.h index 21c00bd42ca..5f22a75795f 100644 --- a/src/include/daos_srv/ras.h +++ b/src/include/daos_srv/ras.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -42,7 +42,7 @@ X(RAS_ENGINE_DIED, "engine_died") \ X(RAS_ENGINE_ASSERTED, "engine_asserted") \ X(RAS_ENGINE_CLOCK_DRIFT, "engine_clock_drift") \ - X(RAS_POOL_CORRUPTION_DETECTED, "corruption_detected") \ + X(RAS_POOL_CORRUPTION_DETECTED, "pool_corruption_detected") \ X(RAS_POOL_REBUILD_START, "pool_rebuild_started") \ X(RAS_POOL_REBUILD_END, "pool_rebuild_finished") \ X(RAS_POOL_REBUILD_FAILED, "pool_rebuild_failed") \ @@ -61,7 +61,9 @@ X(RAS_DEVICE_PLUGGED, "device_plugged") \ X(RAS_DEVICE_REPLACE, "device_replace") \ X(RAS_SYSTEM_FABRIC_PROV_CHANGED, "system_fabric_provider_changed") \ - X(RAS_ENGINE_JOIN_FAILED, "engine_join_failed") + X(RAS_ENGINE_JOIN_FAILED, "engine_join_failed") \ + X(RAS_DEVICE_LINK_SPEED_CHANGED, "device_link_speed_changed") \ + X(RAS_DEVICE_LINK_WIDTH_CHANGED, "device_link_width_changed") /** Define RAS event enum */ typedef enum { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a2cb1f41adb..0a246bebdca 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -60,6 +60,9 @@ obj_gen_dtx_mbs(uint32_t flags, uint32_t *tgt_cnt, struct daos_shard_tgt **p_tgt int i; int j; + if (*tgt_cnt == 0) + return 0; + D_ASSERT(tgts != NULL); if (!(flags & ORF_CONTAIN_LEADER)) { @@ -2652,11 +2655,9 @@ ds_obj_tgt_update_handler(crt_rpc_t *rpc) tgts = orw->orw_shard_tgts.ca_arrays; tgt_cnt = orw->orw_shard_tgts.ca_count; - if (!daos_is_zero_dti(&orw->orw_dti) && tgt_cnt != 0) { - rc = obj_gen_dtx_mbs(orw->orw_flags, &tgt_cnt, &tgts, &mbs); - if (rc != 0) - D_GOTO(out, rc); - } + rc = obj_gen_dtx_mbs(orw->orw_flags, &tgt_cnt, &tgts, &mbs); + if (rc != 0) + D_GOTO(out, rc); epoch.oe_value = orw->orw_epoch; epoch.oe_first = orw->orw_epoch_first; @@ -2880,11 +2881,9 @@ ds_obj_rw_handler(crt_rpc_t *rpc) tgts = orw->orw_shard_tgts.ca_arrays; tgt_cnt = orw->orw_shard_tgts.ca_count; - if (!daos_is_zero_dti(&orw->orw_dti) && tgt_cnt != 0) { - rc = obj_gen_dtx_mbs(orw->orw_flags, &tgt_cnt, &tgts, &mbs); - if (rc != 0) - D_GOTO(out, rc); - } + rc = obj_gen_dtx_mbs(orw->orw_flags, &tgt_cnt, &tgts, &mbs); + if (rc != 0) + D_GOTO(out, rc); version = orw->orw_map_ver; max_ver = orw->orw_map_ver; @@ -3004,9 +3003,6 @@ ds_obj_rw_handler(crt_rpc_t *rpc) goto again2; } - /* Standalone fetches do not get -DER_TX_RESTART. */ - D_ASSERT(!daos_is_zero_dti(&orw->orw_dti)); - break; case -DER_AGAIN: orw->orw_flags |= ORF_RESEND; @@ -3426,27 +3422,16 @@ obj_punch_complete(crt_rpc_t *rpc, int status, uint32_t map_version) } static int -obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, - struct obj_io_context *ioc, struct dtx_handle *dth) +obj_punch_one(struct obj_punch_in *opi, crt_opcode_t opc, + struct obj_io_context *ioc, struct dtx_handle *dth) { struct ds_cont_child *cont = ioc->ioc_coc; - struct dtx_share_peer *dsp; - uint64_t sched_seq; - uint32_t retry = 0; - int rc = 0; - - if (daos_is_zero_dti(&opi->opi_dti)) { - D_DEBUG(DB_TRACE, "disable dtx\n"); - dth = NULL; - } + int rc; -again: rc = dtx_sub_init(dth, &opi->opi_oid, opi->opi_dkey_hash); if (rc != 0) goto out; - sched_seq = sched_cur_seq(); - switch (opc) { case DAOS_OBJ_RPC_PUNCH: case DAOS_OBJ_RPC_TGT_PUNCH: @@ -3478,7 +3463,32 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, D_GOTO(out, rc = -DER_NOSYS); } - if (dth != NULL && obj_dtx_need_refresh(dth, rc)) { +out: + return rc; +} + +static int +obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, uint32_t shard_nr, uint32_t *shards, + struct obj_io_context *ioc, struct dtx_handle *dth) +{ + struct dtx_share_peer *dsp; + uint64_t sched_seq; + uint32_t retry = 0; + int rc = 0; + int i; + +again: + sched_seq = sched_cur_seq(); + + /* There may be multiple shards reside on the same VOS target. */ + for (i = 0; i < shard_nr; i++) { + opi->opi_oid.id_shard = shards[i]; + rc = obj_punch_one(opi, opc, ioc, dth); + if (rc != 0) + break; + } + + if (obj_dtx_need_refresh(dth, rc)) { if (unlikely(++retry % 10 == 3)) { dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer, dsp_link); @@ -3559,7 +3569,6 @@ obj_tgt_punch(struct obj_tgt_punch_args *otpa, uint32_t *shards, uint32_t count) daos_epoch_t tmp; uint32_t dtx_flags = 0; int rc = 0; - int i; if (p_ioc == NULL) { p_ioc = &ioc; @@ -3616,17 +3625,11 @@ obj_tgt_punch(struct obj_tgt_punch_args *otpa, uint32_t *shards, uint32_t count) D_GOTO(out, rc = -DER_IO); exec: - /* There may be multiple shards reside on the same VOS target. */ - for (i = 0; i < count; i++) { - opi->opi_oid.id_shard = shards[i]; - rc = obj_local_punch(opi, otpa->opc, p_ioc, dth); - if (rc != 0) { - DL_CDEBUG(rc == -DER_INPROGRESS || rc == -DER_TX_RESTART || - (rc == -DER_NONEXIST && (opi->opi_api_flags & DAOS_COND_PUNCH)), - DB_IO, DLOG_ERR, rc, DF_UOID, DP_UOID(opi->opi_oid)); - goto out; - } - } + rc = obj_local_punch(opi, otpa->opc, count, shards, p_ioc, dth); + if (rc != 0) + DL_CDEBUG(rc == -DER_INPROGRESS || rc == -DER_TX_RESTART || + (rc == -DER_NONEXIST && (opi->opi_api_flags & DAOS_COND_PUNCH)), + DB_IO, DLOG_ERR, rc, DF_UOID, DP_UOID(opi->opi_oid)); out: if (otpa->ver != NULL) @@ -3652,11 +3655,9 @@ ds_obj_tgt_punch_handler(crt_rpc_t *rpc) uint32_t version = 0; int rc; - if (!daos_is_zero_dti(&opi->opi_dti) && tgt_cnt != 0) { - rc = obj_gen_dtx_mbs(opi->opi_flags, &tgt_cnt, &tgts, &otpa.mbs); - if (rc != 0) - D_GOTO(out, rc); - } + rc = obj_gen_dtx_mbs(opi->opi_flags, &tgt_cnt, &tgts, &otpa.mbs); + if (rc != 0) + D_GOTO(out, rc); otpa.opc = opc_get(rpc->cr_opc); otpa.opi = opi; @@ -3732,7 +3733,8 @@ obj_tgt_punch_disp(struct dtx_leader_handle *dlh, void *arg, int idx, dtx_sub_co if (dlh->dlh_handle.dth_prepared) goto comp; - rc = obj_local_punch(opi, opc_get(rpc->cr_opc), exec_arg->ioc, &dlh->dlh_handle); + rc = obj_local_punch(opi, opc_get(rpc->cr_opc), 1, &opi->opi_oid.id_shard, + exec_arg->ioc, &dlh->dlh_handle); if (rc != 0) DL_CDEBUG(rc == -DER_INPROGRESS || rc == -DER_TX_RESTART || (rc == -DER_NONEXIST && (opi->opi_api_flags & DAOS_COND_PUNCH)), @@ -3809,11 +3811,9 @@ ds_obj_punch_handler(crt_rpc_t *rpc) tgts = opi->opi_shard_tgts.ca_arrays; tgt_cnt = opi->opi_shard_tgts.ca_count; - if (!daos_is_zero_dti(&opi->opi_dti) && tgt_cnt != 0) { - rc = obj_gen_dtx_mbs(opi->opi_flags, &tgt_cnt, &tgts, &mbs); - if (rc != 0) - D_GOTO(out, rc); - } + rc = obj_gen_dtx_mbs(opi->opi_flags, &tgt_cnt, &tgts, &mbs); + if (rc != 0) + D_GOTO(out, rc); if (tgt_cnt == 0) { if (!(opi->opi_api_flags & DAOS_COND_MASK)) @@ -5655,7 +5655,8 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) 1 /* start, [0] is for current engine */, ocpi->ocpi_disp_width, &exec_arg.coll_cur); - rc = dtx_leader_begin(ioc.ioc_vos_coh, &odm->odm_xid, &epoch, 1, version, + rc = dtx_leader_begin(ioc.ioc_vos_coh, &odm->odm_xid, &epoch, + dcts[0].dct_shards[dmi->dmi_tgt_id].dcs_nr, version, &ocpi->ocpi_oid, NULL /* dti_cos */, 0 /* dti_cos_cnt */, NULL /* tgts */, exec_arg.coll_cur.grp_nr /* tgt_cnt */, dtx_flags, odm->odm_mbs, dce, &dlh); diff --git a/src/tests/ftest/dfuse/bash.py b/src/tests/ftest/dfuse/bash.py index 4754d70bfaf..964d0295954 100644 --- a/src/tests/ftest/dfuse/bash.py +++ b/src/tests/ftest/dfuse/bash.py @@ -128,7 +128,10 @@ def run_bashcmd(self, il_lib=None, compatible_mode=False): f"bzip2 -z {fuse_root_dir}/lib.a", f"chmod u-r {fuse_root_dir}/lib.a.bz2", 'fio --readwrite=randwrite --name=test --size="2M" --directory ' - f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync ' + f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync --thread=0' + "--group_reporting --exitall_on_error --continue_on_error=none", + 'fio --readwrite=randwrite --name=test --size="2M" --directory ' + f'{fuse_root_dir}/ --bs=1M --numjobs="4" --ioengine=psync --thread=1' "--group_reporting --exitall_on_error --continue_on_error=none", 'fio --readwrite=randwrite --name=test --size="2M" --directory ' f'{fuse_root_dir}/ --bs=1M --numjobs="1" --ioengine=libaio --iodepth=16' @@ -152,7 +155,7 @@ def test_bashcmd(self): :avocado: tags=all,daily_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs + :avocado: tags=dfs,dfuse :avocado: tags=DfuseBashCmd,test_bashcmd """ self.run_bashcmd() @@ -167,7 +170,7 @@ def test_bashcmd_ioil(self): :avocado: tags=all,pr,daily_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs,ioil + :avocado: tags=dfs,dfuse,ioil :avocado: tags=DfuseBashCmd,test_bashcmd_ioil """ self.run_bashcmd(il_lib="libioil.so") @@ -182,7 +185,7 @@ def test_bashcmd_pil4dfs(self): :avocado: tags=all,daily_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs,pil4dfs + :avocado: tags=dfs,dfuse,pil4dfs :avocado: tags=DfuseBashCmd,test_bashcmd_pil4dfs """ self.run_bashcmd(il_lib="libpil4dfs.so") diff --git a/src/tests/ftest/dfuse/bash_dcache.py b/src/tests/ftest/dfuse/bash_dcache.py index 306228da283..3028cd453da 100644 --- a/src/tests/ftest/dfuse/bash_dcache.py +++ b/src/tests/ftest/dfuse/bash_dcache.py @@ -51,7 +51,7 @@ def test_bash_dcache_pil4dfs(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=pil4dfs,dfs + :avocado: tags=dfs,dfuse,pil4dfs :avocado: tags=DFuseBashdcacheTest,test_bash_dcache_pil4dfs """ diff --git a/src/tests/ftest/dfuse/bash_fd.py b/src/tests/ftest/dfuse/bash_fd.py index 925e96e8a9c..069d9396f70 100644 --- a/src/tests/ftest/dfuse/bash_fd.py +++ b/src/tests/ftest/dfuse/bash_fd.py @@ -137,7 +137,7 @@ def test_bashfd(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs + :avocado: tags=dfs,dfuse :avocado: tags=DFuseFdTest,test_bashfd """ self.run_bashfd() @@ -150,7 +150,7 @@ def test_bashfd_ioil(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs,ioil + :avocado: tags=dfs,dfuse,ioil :avocado: tags=DFuseFdTest,test_bashfd_ioil """ self.run_bashfd(il_lib="libioil.so") @@ -163,7 +163,7 @@ def test_bashfd_pil4dfs(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=dfuse,dfs,pil4dfs + :avocado: tags=dfs,dfuse,pil4dfs :avocado: tags=DFuseFdTest,test_bashfd_pil4dfs """ self.run_bashfd(il_lib="libpil4dfs.so") diff --git a/src/tests/ftest/dfuse/daos_build.py b/src/tests/ftest/dfuse/daos_build.py index c84125f20c3..49f125b2ada 100644 --- a/src/tests/ftest/dfuse/daos_build.py +++ b/src/tests/ftest/dfuse/daos_build.py @@ -64,7 +64,7 @@ def test_dfuse_daos_build_wt_il(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=daosio,dfuse,il,dfs + :avocado: tags=daosio,dfs,dfuse,ioil :avocado: tags=DaosBuild,test_dfuse_daos_build_wt_il """ self.run_build_test("writethrough", il_lib='libioil.so', run_on_vms=True) @@ -80,7 +80,7 @@ def test_dfuse_daos_build_wt_pil4dfs(self): :avocado: tags=all,full_regression :avocado: tags=vm - :avocado: tags=daosio,dfuse,il,dfs,pil4dfs + :avocado: tags=daosio,dfs,dfuse,pil4dfs :avocado: tags=DaosBuild,test_dfuse_daos_build_wt_pil4dfs """ self.run_build_test("nocache", il_lib='libpil4dfs.so', run_on_vms=True) diff --git a/src/tests/ftest/dfuse/mu_perms.py b/src/tests/ftest/dfuse/mu_perms.py index a6fc558f157..d8716a2e094 100644 --- a/src/tests/ftest/dfuse/mu_perms.py +++ b/src/tests/ftest/dfuse/mu_perms.py @@ -416,7 +416,7 @@ def test_dfuse_mu_perms_ioil(self): """ :avocado: tags=all,daily_regression :avocado: tags=vm - :avocado: tags=dfuse,dfuse_mu,verify_perms + :avocado: tags=dfuse,dfuse_mu,ioil,verify_perms :avocado: tags=DfuseMUPerms,test_dfuse_mu_perms_ioil """ self.run_test_il(il_lib='libioil.so') @@ -426,7 +426,7 @@ def test_dfuse_mu_perms_pil4dfs(self): """ :avocado: tags=all,daily_regression :avocado: tags=vm - :avocado: tags=dfuse,dfuse_mu,verify_perms,pil4dfs + :avocado: tags=dfuse,dfuse_mu,pil4dfs,verify_perms :avocado: tags=DfuseMUPerms,test_dfuse_mu_perms_pil4dfs """ self.run_test_il(il_lib='libpil4dfs.so') diff --git a/src/tests/ftest/dfuse/pil4dfs_dcache.py b/src/tests/ftest/dfuse/pil4dfs_dcache.py index 683b95a0066..b6577fe69fd 100644 --- a/src/tests/ftest/dfuse/pil4dfs_dcache.py +++ b/src/tests/ftest/dfuse/pil4dfs_dcache.py @@ -448,7 +448,7 @@ def test_pil4dfs_dcache_enabled(self): :avocado: tags=all,daily_regression :avocado: tags=hw,medium - :avocado: tags=pil4dfs,dcache,dfuse + :avocado: tags=dcache,dfuse,pil4dfs :avocado: tags=Pil4dfsDcache,test_pil4dfs_dcache_enabled """ self.log_step("Mount a DFuse mount point") @@ -482,7 +482,7 @@ def test_pil4dfs_dcache_disabled(self): :avocado: tags=all,daily_regression :avocado: tags=hw,medium - :avocado: tags=pil4dfs,dcache,dfuse + :avocado: tags=dcache,dfuse,pil4dfs :avocado: tags=Pil4dfsDcache,test_pil4dfs_dcache_disabled """ self.log_step("Mount a DFuse mount point") @@ -517,7 +517,7 @@ def test_pil4dfs_dcache_gc_disabled(self): :avocado: tags=all,daily_regression :avocado: tags=hw,medium - :avocado: tags=pil4dfs,dcache,dfuse + :avocado: tags=dcache,dfuse,pil4dfs :avocado: tags=Pil4dfsDcache,test_pil4dfs_dcache_gc_disabled """ self.log_step("Mount a DFuse mount point") diff --git a/src/tests/ftest/dfuse/pil4dfs_fio.py b/src/tests/ftest/dfuse/pil4dfs_fio.py index 2aa3cd1b952..9b32ef39937 100644 --- a/src/tests/ftest/dfuse/pil4dfs_fio.py +++ b/src/tests/ftest/dfuse/pil4dfs_fio.py @@ -180,7 +180,7 @@ def test_pil4dfs_vs_dfs(self): :avocado: tags=all,daily_regression :avocado: tags=hw,medium - :avocado: tags=pil4dfs,dfuse,dfs,fio + :avocado: tags=dfs,dfuse,pil4dfs,fio :avocado: tags=Pil4dfsFio,test_pil4dfs_vs_dfs """ bw_deltas = {} diff --git a/src/tests/ftest/ior/intercept_messages.py b/src/tests/ftest/ior/intercept_messages.py index a431f2fe47b..0acdd8d1a0b 100644 --- a/src/tests/ftest/ior/intercept_messages.py +++ b/src/tests/ftest/ior/intercept_messages.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2022 Intel Corporation. + (C) Copyright 2019-2024 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -32,7 +32,7 @@ def test_ior_intercept_messages(self): :avocado: tags=all,full_regression :avocado: tags=hw,medium - :avocado: tags=daosio,dfuse,il,ior,ior_intercept + :avocado: tags=daosio,dfuse,ioil,ior,ior_intercept :avocado: tags=IorInterceptMessages,test_ior_intercept_messages """ d_il_report_value = self.params.get("value", "/run/tests/D_IL_REPORT/*") diff --git a/src/tests/ftest/ior/intercept_messages_pil4dfs.py b/src/tests/ftest/ior/intercept_messages_pil4dfs.py index 854907c3f8a..3b9b9f949f1 100644 --- a/src/tests/ftest/ior/intercept_messages_pil4dfs.py +++ b/src/tests/ftest/ior/intercept_messages_pil4dfs.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2023 Intel Corporation. + (C) Copyright 2019-2024 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -33,7 +33,7 @@ def test_ior_intercept_messages_pil4dfs(self): :avocado: tags=all,full_regression :avocado: tags=hw,medium - :avocado: tags=daosio,dfuse,il,ior,ior_intercept,pil4dfs + :avocado: tags=daosio,dfuse,pil4dfs,ior,ior_intercept :avocado: tags=IorInterceptMessagesPil4dfs,test_ior_intercept_messages_pil4dfs """ intercept = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so') diff --git a/src/tests/ftest/ior/intercept_multi_client.py b/src/tests/ftest/ior/intercept_multi_client.py index 3a8254d5bc1..e9e6e331972 100644 --- a/src/tests/ftest/ior/intercept_multi_client.py +++ b/src/tests/ftest/ior/intercept_multi_client.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2023 Intel Corporation. + (C) Copyright 2019-2024 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -27,7 +27,7 @@ def test_ior_intercept_libioil(self): :avocado: tags=all,full_regression :avocado: tags=hw,large - :avocado: tags=daosio,dfuse,il,ior,ior_intercept + :avocado: tags=daosio,dfuse,ioil,ior,ior_intercept :avocado: tags=IorInterceptMultiClient,test_ior_intercept_libioil """ self.run_il_perf_check('libioil.so') @@ -45,7 +45,7 @@ def test_ior_intercept_libpil4dfs(self): :avocado: tags=all,full_regression :avocado: tags=hw,large - :avocado: tags=daosio,dfuse,il,ior,ior_intercept,pil4dfs + :avocado: tags=daosio,dfuse,pil4dfs,ior,ior_intercept :avocado: tags=IorInterceptMultiClient,test_ior_intercept_libpil4dfs """ self.run_il_perf_check('libpil4dfs.so') diff --git a/src/tests/ftest/telemetry/wal_metrics.py b/src/tests/ftest/telemetry/wal_metrics.py index 96ac57e30a3..105015aaf29 100644 --- a/src/tests/ftest/telemetry/wal_metrics.py +++ b/src/tests/ftest/telemetry/wal_metrics.py @@ -100,8 +100,8 @@ def test_wal_replay_metrics(self): # Replay size should be > 0 after pool create for MD on SSD ranges[metric][label] = [1] elif metric.endswith('_replay_time'): - # Replay time should be 10 - 50,000 after pool create for MD on SSD - ranges[metric][label] = [10, 50000] + # Replay time should be 1 - 1,000,000 us after pool create for MD on SSD + ranges[metric][label] = [1, 1000000] elif metric.endswith('_replay_transactions'): # Replay transactions should be > 0 after pool create for MD on SSD ranges[metric][label] = [1] diff --git a/src/tests/ftest/util/file_count_test_base.py b/src/tests/ftest/util/file_count_test_base.py index ef6c83dd275..58b1a49f0a5 100644 --- a/src/tests/ftest/util/file_count_test_base.py +++ b/src/tests/ftest/util/file_count_test_base.py @@ -117,7 +117,7 @@ def run_file_count(self): try: self.processes = ior_np self.ppn = ior_ppn - if self.ior_cmd.api.value == 'HDF5-VOL': + if api == 'HDF5-VOL': self.ior_cmd.api.update('HDF5') self.run_ior_with_pool( create_pool=False, plugin_path=hdf5_plugin_path, mount_dir=mount_dir) diff --git a/src/tests/ftest/util/launch_utils.py b/src/tests/ftest/util/launch_utils.py index 2ed42ce7a9b..a86df01ea55 100644 --- a/src/tests/ftest/util/launch_utils.py +++ b/src/tests/ftest/util/launch_utils.py @@ -1015,8 +1015,8 @@ def update_test_yaml(self, logger, scm_size, scm_mount, extra_yaml, multiplier, if new_yaml_file: if verbose > 0: # Optionally display a diff of the yaml file - if not run_local(logger, f"diff -y {test.yaml_file} {new_yaml_file}").passed: - raise RunException(f"Error diff'ing {test.yaml_file}") + # diff returns rc=1 if the files are different, so ignore errors + run_local(logger, f"diff -y {test.yaml_file} {new_yaml_file}") test.yaml_file = new_yaml_file # Display the modified yaml file variants with debug diff --git a/src/utils/ctl/cart_ctl.c b/src/utils/ctl/cart_ctl.c index b251f5e9d79..3bdf65b2bbb 100644 --- a/src/utils/ctl/cart_ctl.c +++ b/src/utils/ctl/cart_ctl.c @@ -760,6 +760,13 @@ ctl_init() rc); } + /* Stop the progress thread before destroying the group */ + crtu_progress_stop(); + + rc = pthread_join(ctl_gdata.cg_tid, NULL); + if (rc != 0) + error_warn("Failed to join the threads; rc=%d\n", rc); + d_rank_list_free(rank_list); if (ctl_gdata.cg_save_cfg) { @@ -772,12 +779,6 @@ ctl_init() error_warn("Failed to destroy the view; rc=%d\n", rc); } - crtu_progress_stop(); - - rc = pthread_join(ctl_gdata.cg_tid, NULL); - if (rc != 0) - error_warn("Failed to join the threads; rc=%d\n", rc); - rc = sem_destroy(&ctl_gdata.cg_num_reply); if (rc != 0) error_warn("Failed to destroy a semaphore; rc=%d\n", rc); diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index 28e2ac86757..fb8461e2931 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -269,13 +269,15 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, struct umem_rsrvd_act **rsrvd_scmp, d_list_t *nvme_exts, bool started, struct bio_desc *biod, int err) { - struct vos_pool *pool; - struct dtx_handle *dth = dth_in; - struct vos_dtx_act_ent *dae; - struct dtx_rsrvd_uint *dru; - struct vos_dtx_cmt_ent *dce = NULL; - struct dtx_handle tmp = {0}; - int rc; + struct vos_pool *pool; + struct umem_instance *umm; + struct dtx_handle *dth = dth_in; + struct vos_dtx_act_ent *dae; + struct vos_dtx_act_ent_df *dae_df; + struct dtx_rsrvd_uint *dru; + struct vos_dtx_cmt_ent *dce = NULL; + struct dtx_handle tmp = {0}; + int rc = 0; if (!dtx_is_valid_handle(dth)) { /** Created a dummy dth handle for publishing extents */ @@ -287,11 +289,11 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, D_INIT_LIST_HEAD(&tmp.dth_deferred_nvme); } - if (dth->dth_local) { + if (dth->dth_local) pool = vos_hdl2pool(dth_in->dth_poh); - } else { + else pool = cont->vc_pool; - } + umm = vos_pool2umm(pool); if (rsrvd_scmp != NULL) { D_ASSERT(nvme_exts != NULL); @@ -300,7 +302,7 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, * Just do your best to release the SCM reservation. Can't handle another * error while handling one already anyway. */ - (void)vos_publish_scm(vos_pool2umm(pool), *rsrvd_scmp, false /* publish */); + (void)vos_publish_scm(umm, *rsrvd_scmp, false /* publish */); D_FREE(*rsrvd_scmp); *rsrvd_scmp = NULL; err = -DER_NOMEM; @@ -341,9 +343,9 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, vos_dth_set(NULL, pool->vp_sysdb); if (bio_nvme_configured(SMD_DEV_TYPE_META) && biod != NULL) - err = umem_tx_end_ex(vos_pool2umm(pool), err, biod); + err = umem_tx_end_ex(umm, err, biod); else - err = umem_tx_end(vos_pool2umm(pool), err); + err = umem_tx_end(umm, err); cancel: if (dtx_is_valid_handle(dth_in)) { @@ -409,8 +411,11 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, vos_dtx_post_handle(cont, &dae, &dce, 1, false, err != 0); } else { D_ASSERT(dce == NULL); - if (err == 0) + if (err == 0) { dae->dae_prepared = 1; + dae_df = umem_off2ptr(umm, dae->dae_df_off); + D_ASSERT(!(dae_df->dae_flags & DTE_INVALID)); + } } } } diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 27a41ce5776..0e70133629f 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -639,17 +639,25 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, return 0; /* In spite of for commit or abort, the DTX must be local preparing/prepared. */ - D_ASSERTF(vos_dae_is_prepare(dae), "Unexpected DTX "DF_DTI" status for %s\n", - DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit"); + D_ASSERTF(vos_dae_is_prepare(dae), + "Unexpected DTX "DF_DTI" status for %s in pool "DF_UUID" cont "DF_UUID"\n", + DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit", + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); dbd = dae->dae_dbd; dae_df = umem_off2ptr(umm, dae->dae_df_off); - D_ASSERTF(dae_df != NULL, "Hit invalid DTX entry "DF_DTI" when release for %s\n", - DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit"); + D_ASSERTF(dae_df != NULL, "Hit invalid DTX entry "DF_DTI" when release for %s in pool " + DF_UUID" cont "DF_UUID"\n", DP_DTI(&DAE_XID(dae)), abort ? "abort" : "commit", + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); D_ASSERTF(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC, - "Invalid blob %p magic %x for "DF_DTI" (lid %x)\n", - dbd, dbd->dbd_magic, DP_DTI(&DAE_XID(dae)), DAE_LID(dae)); + "Bad blob %p magic %x for "DF_DTI" (lid %x) in pool "DF_UUID" cont "DF_UUID"\n", + dbd, dbd->dbd_magic, DP_DTI(&DAE_XID(dae)), DAE_LID(dae), + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); + D_ASSERTF(dbd->dbd_index > 0, + "%s DTX "DF_DTI" against new DTX blob %p in pool "DF_UUID" cont "DF_UUID"\n", + abort ? "abort" : "commit", DP_DTI(&DAE_XID(dae)), dbd, + DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); if (!UMOFF_IS_NULL(dae_df->dae_mbs_off)) { /* dae_mbs_off will be invalid via flag DTE_INVALID. */ @@ -688,19 +696,16 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, } if (dbd->dbd_count > 1 || dbd->dbd_index < dbd->dbd_cap) { - rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, - sizeof(dae_df->dae_flags)); + rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc != 0) return rc; - /* Mark the DTX entry as invalid in SCM. */ - dae_df->dae_flags = DTE_INVALID; - - rc = umem_tx_add_ptr(umm, &dbd->dbd_count, - sizeof(dbd->dbd_count)); + rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count)); if (rc != 0) return rc; + /* Mark the DTX entry as invalid persistently. */ + dae_df->dae_flags = DTE_INVALID; dbd->dbd_count--; } else { struct vos_cont_df *cont_df = cont->vc_cont_df; @@ -922,6 +927,8 @@ vos_dtx_extend_act_table(struct vos_container *cont) dbd->dbd_magic = DTX_ACT_BLOB_MAGIC; dbd->dbd_cap = (DTX_BLOB_SIZE - sizeof(struct vos_dtx_blob_df)) / sizeof(struct vos_dtx_act_ent_df); + dbd->dbd_count = 0; + dbd->dbd_index = 0; tmp = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); if (tmp == NULL) { @@ -932,14 +939,14 @@ vos_dtx_extend_act_table(struct vos_container *cont) sizeof(cont_df->cd_dtx_active_head) + sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) - return rc; + goto out; cont_df->cd_dtx_active_head = dbd_off; } else { rc = umem_tx_add_ptr(umm, &tmp->dbd_next, sizeof(tmp->dbd_next)); if (rc != 0) - return rc; + goto out; tmp->dbd_next = dbd_off; @@ -947,19 +954,20 @@ vos_dtx_extend_act_table(struct vos_container *cont) rc = umem_tx_add_ptr(umm, &cont_df->cd_dtx_active_tail, sizeof(cont_df->cd_dtx_active_tail)); if (rc != 0) - return rc; + goto out; } cont_df->cd_dtx_active_tail = dbd_off; - D_DEBUG(DB_IO, "Allocated DTX active blob %p ("UMOFF_PF") for cont "DF_UUID"\n", - dbd, UMOFF_P(dbd_off), DP_UUID(cont->vc_id)); - - return 0; +out: + DL_CDEBUG(rc == 0, DB_IO, DLOG_ERR, rc, + "Allocated DTX active blob %p ("UMOFF_PF") for cont "DF_UUID, + dbd, UMOFF_P(dbd_off), DP_UUID(cont->vc_id)); + return rc; } static int -vos_dtx_alloc(struct umem_instance *umm, struct vos_dtx_blob_df *dbd, struct dtx_handle *dth) +vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) { struct vos_dtx_act_ent *dae = NULL; struct vos_container *cont; @@ -1005,21 +1013,12 @@ vos_dtx_alloc(struct umem_instance *umm, struct vos_dtx_blob_df *dbd, struct dtx DAE_MBS_FLAGS(dae) = 0; } - if (dbd != NULL) { - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - } - /* Will be set as dbd::dbd_index via vos_dtx_prepared(). */ DAE_INDEX(dae) = DTX_INDEX_INVAL; - dae->dae_dbd = dbd; dae->dae_dth = dth; - D_DEBUG(DB_IO, "Allocated new lid DTX: "DF_DTI" lid=%lx, dae=%p, dae_dbd=%p\n", - DP_DTI(&dth->dth_xid), DAE_LID(dae) & DTX_LID_SOLO_MASK, dae, dbd); + D_DEBUG(DB_IO, "Allocated new lid DTX: "DF_DTI" lid=%lx, dae=%p\n", + DP_DTI(&dth->dth_xid), DAE_LID(dae) & DTX_LID_SOLO_MASK, dae); d_iov_set(&kiov, &DAE_XID(dae), sizeof(DAE_XID(dae))); d_iov_set(&riov, dae, sizeof(*dae)); @@ -1445,46 +1444,6 @@ vos_dtx_validation(struct dtx_handle *dth) return rc; } -static int -vos_dtx_active(struct dtx_handle *dth) -{ - struct vos_dtx_act_ent *dae = dth->dth_ent; - struct vos_container *cont; - struct vos_cont_df *cont_df; - struct umem_instance *umm; - struct vos_dtx_blob_df *dbd; - int rc = 0; - - if (dae->dae_dbd != NULL) - goto out; - - cont = vos_hdl2cont(dth->dth_coh); - cont_df = cont->vc_cont_df; - umm = vos_cont2umm(cont); - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - - if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { - rc = vos_dtx_extend_act_table(cont); - if (rc != 0) - goto out; - - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - } - - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - dae->dae_dbd = dbd; - -out: - if (rc == 0) - dth->dth_active = 1; - - return rc; -} - /* The caller has started local transaction. */ int vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, @@ -1552,15 +1511,10 @@ vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, return 0; } - if (!dth->dth_active) { - rc = vos_dtx_active(dth); - if (rc != 0) - goto out; - } - rc = vos_dtx_append(dth, record, type); if (rc == 0) { /* Incarnation log entry implies a share */ + dth->dth_active = 1; *tx_id = DAE_LID(dae); if (type == DTX_RT_ILOG) dth->dth_modify_shared = 1; @@ -1577,20 +1531,18 @@ vos_dtx_register_record(struct umem_instance *umm, umem_off_t record, } /* The caller has started local transaction. */ -void +int vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, uint32_t entry, daos_epoch_t epoch, umem_off_t record) { struct vos_container *cont; struct vos_dtx_act_ent *dae; - struct vos_dtx_act_ent_df *dae_df; - umem_off_t *rec_df; bool found; int count; int i; if (!vos_dtx_is_normal_entry(entry)) - return; + return 0; D_ASSERT(entry >= DTX_LID_RESERVED); @@ -1600,20 +1552,24 @@ vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, * The on-disk entry will be destroyed soon. */ if (cont == NULL) - return; + return 0; found = lrua_lookupx(cont->vc_dtx_array, entry - DTX_LID_RESERVED, epoch, &dae); if (!found) { D_WARN("Could not find active DTX record for lid=%d, epoch=" DF_U64"\n", entry, epoch); - return; + return 0; } - dae_df = umem_off2ptr(umm, dae->dae_df_off); - if (daos_is_zero_dti(&dae_df->dae_xid) || - dae_df->dae_flags & DTE_INVALID) - return; + /* + * NOTE: If the record to be deregistered (for free or overwrite, and so on) is referenced + * by another prepared (but non-committed) DTX, then do not allow current transaction + * to modify it. Because if current transaction is aborted or failed for some reason, + * there is no efficient way to recover such former non-committed DTX. + */ + if (dae->dae_dbd != NULL) + return dtx_inprogress(dae, vos_dth_get(cont->vc_pool->vp_sysdb), false, false, 8); if (DAE_REC_CNT(dae) > DTX_INLINE_REC_CNT) count = DTX_INLINE_REC_CNT; @@ -1623,46 +1579,18 @@ vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, for (i = 0; i < count; i++) { if (record == umem_off2offset(DAE_REC_INLINE(dae)[i])) { DAE_REC_INLINE(dae)[i] = UMOFF_NULL; - goto handle_df; + return 0; } } for (i = 0; i < DAE_REC_CNT(dae) - DTX_INLINE_REC_CNT; i++) { if (record == umem_off2offset(dae->dae_records[i])) { dae->dae_records[i] = UMOFF_NULL; - goto handle_df; - } - } - - /* Not found */ - return; - -handle_df: - if (dae_df->dae_rec_cnt > DTX_INLINE_REC_CNT) - count = DTX_INLINE_REC_CNT; - else - count = dae_df->dae_rec_cnt; - - rec_df = dae_df->dae_rec_inline; - for (i = 0; i < count; i++) { - if (umem_off2offset(rec_df[i]) == record) { - rec_df[i] = UMOFF_NULL; - return; + return 0; } } - rec_df = umem_off2ptr(umm, dae_df->dae_rec_off); - - /* Not found */ - if (rec_df == NULL) - return; - - for (i = 0; i < dae_df->dae_rec_cnt - DTX_INLINE_REC_CNT; i++) { - if (umem_off2offset(rec_df[i]) == record) { - rec_df[i] = UMOFF_NULL; - return; - } - } + return 0; } int @@ -1670,6 +1598,8 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) { struct vos_dtx_act_ent *dae = dth->dth_ent; struct vos_container *cont = vos_hdl2cont(dth->dth_coh); + struct vos_dtx_act_ent_df *dae_df; + struct vos_cont_df *cont_df; struct umem_instance *umm; struct vos_dtx_blob_df *dbd; umem_off_t rec_off; @@ -1705,9 +1635,26 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) return rc; } + D_ASSERT(dae->dae_dbd == NULL); + + cont_df = cont->vc_cont_df; umm = vos_cont2umm(cont); - dbd = dae->dae_dbd; - D_ASSERT(dbd != NULL); + dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); + if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { + rc = vos_dtx_extend_act_table(cont); + if (rc != 0) + return rc; + + dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); + } + + D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); + + dae->dae_dbd = dbd; + dae->dae_df_off = umem_ptr2off(umm, dbd) + + offsetof(struct vos_dtx_blob_df, dbd_active_data) + + sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; + dae_df = umem_off2ptr(umm, dae->dae_df_off); /* Use the dkey_hash for the last modification as the dkey_hash * for the whole transaction. It will used as the index for DTX @@ -1784,27 +1731,30 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) DAE_INDEX(dae) = dbd->dbd_index; if (DAE_INDEX(dae) > 0) { - rc = umem_tx_xadd_ptr(umm, umem_off2ptr(umm, dae->dae_df_off), - sizeof(struct vos_dtx_act_ent_df), UMEM_XADD_NO_SNAPSHOT); + rc = umem_tx_xadd_ptr(umm, dae_df, sizeof(*dae_df), UMEM_XADD_NO_SNAPSHOT); if (rc != 0) - return rc; + goto out; /* dbd_index is next to dbd_count */ rc = umem_tx_add_ptr(umm, &dbd->dbd_count, sizeof(dbd->dbd_count) + sizeof(dbd->dbd_index)); if (rc != 0) - return rc; + goto out; } - memcpy(umem_off2ptr(umm, dae->dae_df_off), - &dae->dae_base, sizeof(struct vos_dtx_act_ent_df)); + memcpy(dae_df, &dae->dae_base, sizeof(*dae_df)); dbd->dbd_count++; dbd->dbd_index++; dae->dae_preparing = 1; dae->dae_need_release = 1; - return 0; +out: + DL_CDEBUG(rc != 0, DLOG_ERR, DB_IO, rc, + "Preparing DTX "DF_DTI" in dbd "UMOFF_PF" at index %u, count %u, cap %u", + DP_DTI(&DAE_XID(dae)), UMOFF_P(cont_df->cd_dtx_active_tail), + dbd->dbd_index, dbd->dbd_count, dbd->dbd_cap); + return rc; } static struct dtx_memberships * @@ -2827,12 +2777,16 @@ vos_dtx_act_reindex(struct vos_container *cont) dbd_count++; } - D_ASSERTF(dbd_count == dbd->dbd_count, - "Unmatched active DTX count %d/%d, cap %d, idx %d for blob %p (" - UMOFF_PF"), head "UMOFF_PF", tail "UMOFF_PF"\n", - dbd_count, dbd->dbd_count, dbd->dbd_cap, dbd->dbd_index, dbd, - UMOFF_P(dbd_off), UMOFF_P(cont_df->cd_dtx_active_head), - UMOFF_P(cont_df->cd_dtx_active_tail)); + if (unlikely(dbd_count != dbd->dbd_count)) { + D_ERROR("Unmatched active DTX count %d/%d, cap %d, idx %d for blob %p (" + UMOFF_PF"), head "UMOFF_PF", tail "UMOFF_PF" in pool " + DF_UUID" cont "DF_UUID"\n", dbd_count, dbd->dbd_count, dbd->dbd_cap, + dbd->dbd_index, dbd, UMOFF_P(dbd_off), + UMOFF_P(cont_df->cd_dtx_active_head), + UMOFF_P(cont_df->cd_dtx_active_tail), DP_UUID(cont->vc_pool->vp_id), + DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_IO); + } dbd_off = dbd->dbd_next; } @@ -3012,13 +2966,12 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) { struct vos_container *cont; struct umem_instance *umm = NULL; - struct vos_dtx_blob_df *dbd = NULL; struct vos_dtx_cmt_ent *dce = NULL; - struct vos_cont_df *cont_df = NULL; struct vos_dtx_act_ent *dae; d_iov_t kiov; d_iov_t riov; int rc = 0; + bool tx = false; if (!dtx_is_valid_handle(dth)) return 0; @@ -3055,31 +3008,11 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) if (rc != 0) goto out; - cont_df = cont->vc_cont_df; - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - if (dbd == NULL || dbd->dbd_index >= dbd->dbd_cap) { - rc = vos_dtx_extend_act_table(cont); - if (rc != 0) - goto out; - - dbd = umem_off2ptr(umm, cont_df->cd_dtx_active_tail); - } + tx = true; } - if (dth->dth_ent == NULL) { - rc = vos_dtx_alloc(umm, dbd, dth); - } else if (persistent) { - D_ASSERT(dbd != NULL); - D_ASSERT(dbd->dbd_magic == DTX_ACT_BLOB_MAGIC); - - dae = dth->dth_ent; - D_ASSERT(dae->dae_dbd == NULL); - - dae->dae_df_off = umem_ptr2off(umm, dbd) + - offsetof(struct vos_dtx_blob_df, dbd_active_data) + - sizeof(struct vos_dtx_act_ent_df) * dbd->dbd_index; - dae->dae_dbd = dbd; - } + if (dth->dth_ent == NULL) + rc = vos_dtx_alloc(umm, dth); out: if (rc == 0) { @@ -3094,7 +3027,7 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) } if (persistent) { - if (cont_df != NULL) { + if (tx) { if (rc == 0) { rc = umem_tx_commit(umm); D_ASSERTF(rc == 0, "local TX commit failure %d\n", rc); diff --git a/src/vos/vos_ilog.c b/src/vos/vos_ilog.c index ae67f9a00ac..54abf2f407f 100644 --- a/src/vos/vos_ilog.c +++ b/src/vos/vos_ilog.c @@ -82,8 +82,7 @@ vos_ilog_del(struct umem_instance *umm, umem_off_t ilog_off, uint32_t tx_id, return 0; coh.cookie = (unsigned long)args; - vos_dtx_deregister_record(umm, coh, tx_id, epoch, ilog_off); - return 0; + return vos_dtx_deregister_record(umm, coh, tx_id, epoch, ilog_off); } void diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index a6c7fa9ac00..7d4dd3ac166 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -747,8 +747,10 @@ vos_dtx_get(bool standalone); * \param epoch [IN] Epoch for the DTX. * \param record [IN] Address (offset) of the record to be * deregistered. + * + * \return 0 on success and negative on failure. */ -void +int vos_dtx_deregister_record(struct umem_instance *umm, daos_handle_t coh, uint32_t entry, daos_epoch_t epoch, umem_off_t record); diff --git a/src/vos/vos_tree.c b/src/vos/vos_tree.c index e48467f9364..c36fcaa88c5 100644 --- a/src/vos/vos_tree.c +++ b/src/vos/vos_tree.c @@ -601,6 +601,7 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, struct dtx_handle *dth = NULL; struct umem_rsrvd_act *rsrvd_scm; struct vos_container *cont = vos_hdl2cont(tins->ti_coh); + int rc; if (UMOFF_IS_NULL(rec->rec_off)) return 0; @@ -611,12 +612,12 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, return -DER_NO_PERM; /* Not allowed */ } - vos_dtx_deregister_record(&tins->ti_umm, tins->ti_coh, - irec->ir_dtx, *epc, rec->rec_off); + rc = vos_dtx_deregister_record(&tins->ti_umm, tins->ti_coh, + irec->ir_dtx, *epc, rec->rec_off); + if (rc != 0) + return rc; if (!overwrite) { - int rc; - /* SCM value is stored together with vos_irec_df */ if (addr->ba_type == DAOS_MEDIA_NVME) { struct vos_pool *pool = tins->ti_priv; @@ -796,9 +797,8 @@ evt_dop_log_del(struct umem_instance *umm, daos_epoch_t epoch, daos_handle_t coh; coh.cookie = (unsigned long)args; - vos_dtx_deregister_record(umm, coh, desc->dc_dtx, epoch, - umem_ptr2off(umm, desc)); - return 0; + return vos_dtx_deregister_record(umm, coh, desc->dc_dtx, epoch, + umem_ptr2off(umm, desc)); } void