diff --git a/src/container/srv_target.c b/src/container/srv_target.c index a6d3d3c5286..35ac3d4285c 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -669,6 +669,7 @@ cont_child_alloc_ref(void *co_uuid, unsigned int ksize, void *po_uuid, cont->sc_dtx_committable_coll_count = 0; D_INIT_LIST_HEAD(&cont->sc_dtx_cos_list); D_INIT_LIST_HEAD(&cont->sc_dtx_coll_list); + D_INIT_LIST_HEAD(&cont->sc_dtx_batched_list); *link = &cont->sc_list; return 0; diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ff4f2dfe4ef..ecb156729ed 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -392,11 +392,11 @@ dtx_cleanup(void *arg) if (rc == 0) { D_ASSERT(dce != NULL); - rc = dtx_coll_commit(cont, dce, NULL); + rc = dtx_coll_commit(cont, dce, NULL, false); dtx_coll_entry_put(dce); } } else { - rc = dtx_commit(cont, &dte, NULL, 1); + rc = dtx_commit(cont, &dte, NULL, 1, false); } } @@ -620,17 +620,16 @@ dtx_batched_commit_one(void *arg) tls->dt_batched_ult_cnt++; /* dbca->dbca_reg_gen != cont->sc_dtx_batched_gen means someone reopen the container. */ - while (!dss_ult_exiting(dbca->dbca_commit_req) && + while (!dss_ult_exiting(dbca->dbca_commit_req) && dtx_cont_opened(cont) && dbca->dbca_reg_gen == cont->sc_dtx_batched_gen) { struct dtx_entry **dtes = NULL; - struct dtx_cos_key *dcks = NULL; struct dtx_coll_entry *dce = NULL; struct dtx_stat stat = { 0 }; int cnt; int rc; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, NULL, - DAOS_EPOCH_MAX, false, &dtes, &dcks, &dce); + DAOS_EPOCH_MAX, false, &dtes, NULL, &dce); if (cnt == 0) break; @@ -644,11 +643,11 @@ dtx_batched_commit_one(void *arg) /* Currently, commit collective DTX one by one. */ D_ASSERT(cnt == 1); - rc = dtx_coll_commit(cont, dce, dcks); + rc = dtx_coll_commit(cont, dce, NULL, true); } else { - rc = dtx_commit(cont, dtes, dcks, cnt); + rc = dtx_commit(cont, dtes, NULL, cnt, true); } - dtx_free_committable(dtes, dcks, dce, cnt); + dtx_free_committable(dtes, NULL, dce, cnt); if (rc != 0) { D_WARN("Fail to batched commit %d entries for "DF_UUID": "DF_RC"\n", cnt, DP_UUID(cont->sc_uuid), DP_RC(rc)); @@ -1271,7 +1270,6 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul uint32_t flags; int status = -1; int rc = 0; - int i; bool aborted = false; bool unpin = false; @@ -1424,10 +1422,10 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul vos_dtx_mark_committable(dth); if (dlh->dlh_coll) { - rc = dtx_coll_commit(cont, dlh->dlh_coll_entry, NULL); + rc = dtx_coll_commit(cont, dlh->dlh_coll_entry, NULL, false); } else { dte = &dth->dth_dte; - rc = dtx_commit(cont, &dte, NULL, 1); + rc = dtx_commit(cont, &dte, NULL, 1, false); } if (rc != 0) @@ -1487,15 +1485,9 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul /* If piggyback DTX has been done everywhere, then need to handle CoS cache. * It is harmless to keep some partially committed DTX entries in CoS cache. */ - if (result == 0 && dth->dth_cos_done) { - for (i = 0; i < dth->dth_dti_cos_count; i++) - dtx_cos_del(cont, &dth->dth_dti_cos[i], - &dth->dth_leader_oid, dth->dth_dkey_hash); - } else { - for (i = 0; i < dth->dth_dti_cos_count; i++) - dtx_cos_put_piggyback(cont, &dth->dth_dti_cos[i], - &dth->dth_leader_oid, dth->dth_dkey_hash); - } + dtx_cos_put_piggyback(cont, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_dti_cos, + dth->dth_dti_cos_count, + (result == 0 && dth->dth_cos_done) ? true : false); D_DEBUG(DB_IO, "Stop the DTX "DF_DTI" ver %u, dkey %lu, %s, cos %d/%d: result "DF_RC"\n", DP_DTI(&dth->dth_xid), dth->dth_ver, (unsigned long)dth->dth_dkey_hash, @@ -1654,7 +1646,8 @@ dtx_flush_on_close(struct dss_module_info *dmi, struct dtx_batched_cont_args *db struct dtx_coll_entry *dce = NULL; cnt = dtx_fetch_committable(cont, DTX_THRESHOLD_COUNT, - NULL, DAOS_EPOCH_MAX, true, &dtes, &dcks, &dce); + NULL, DAOS_EPOCH_MAX, true, &dtes, + dbca->dbca_commit_req != NULL ? &dcks : NULL, &dce); if (cnt <= 0) D_GOTO(out, rc = cnt); @@ -1675,9 +1668,9 @@ dtx_flush_on_close(struct dss_module_info *dmi, struct dtx_batched_cont_args *db if (dce != NULL) { D_ASSERT(cnt == 1); - rc = dtx_coll_commit(cont, dce, dcks); + rc = dtx_coll_commit(cont, dce, dcks, true); } else { - rc = dtx_commit(cont, dtes, dcks, cnt); + rc = dtx_commit(cont, dtes, dcks, cnt, true); } dtx_free_committable(dtes, dcks, dce, cnt); } @@ -2365,9 +2358,9 @@ dtx_obj_sync(struct ds_cont_child *cont, daos_unit_oid_t *oid, if (dce != NULL) { D_ASSERT(cnt == 1); - rc = dtx_coll_commit(cont, dce, dcks); + rc = dtx_coll_commit(cont, dce, dcks, true); } else { - rc = dtx_commit(cont, dtes, dcks, cnt); + rc = dtx_commit(cont, dtes, dcks, cnt, true); } dtx_free_committable(dtes, dcks, dce, cnt); if (rc < 0) { diff --git a/src/dtx/dtx_cos.c b/src/dtx/dtx_cos.c index 6e1d042b82b..4c165f94d0c 100644 --- a/src/dtx/dtx_cos.c +++ b/src/dtx/dtx_cos.c @@ -54,6 +54,8 @@ struct dtx_cos_rec_child { d_list_t dcrc_gl_committable; /* Link into related dcr_{reg,prio}_list. */ d_list_t dcrc_lo_link; + /* Link into container::sc_dtx_batched_list. */ + d_list_t dcrc_batched_link; union { struct dtx_entry *dcrc_dte; struct dtx_coll_entry *dcrc_dce; @@ -61,8 +63,12 @@ struct dtx_cos_rec_child { /* The DTX epoch. */ daos_epoch_t dcrc_epoch; struct dtx_cos_rec *dcrc_ptr; + uint64_t dcrc_ready_time; uint32_t dcrc_piggyback_refs; - uint32_t dcrc_coll:1; /* For collective DTX. */ + uint32_t dcrc_expcmt:1, + dcrc_prio:1, + dcrc_reg:1, + dcrc_coll:1; /* For collective DTX. */ }; struct dtx_cos_rec_bundle { @@ -129,6 +135,8 @@ dtx_cos_rec_alloc(struct btr_instance *tins, d_iov_t *key_iov, return -DER_NOMEM; } + D_INIT_LIST_HEAD(&dcrc->dcrc_batched_link); + dcrc->dcrc_ready_time = daos_getmtime_coarse(); dcrc->dcrc_epoch = rbund->epoch; dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { @@ -144,12 +152,15 @@ dtx_cos_rec_alloc(struct btr_instance *tins, d_iov_t *key_iov, d_tm_inc_gauge(tls->dt_committable, 1); if (rbund->flags & DCF_EXP_CMT) { + dcrc->dcrc_expcmt = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_expcmt_list); dcr->dcr_expcmt_count = 1; } else if (rbund->flags & DCF_SHARED) { + dcrc->dcrc_prio = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); dcr->dcr_prio_count = 1; } else { + dcrc->dcrc_reg = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_reg_list); dcr->dcr_reg_count = 1; } @@ -177,6 +188,7 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); + d_list_del(&dcrc->dcrc_batched_link); if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; @@ -190,6 +202,7 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); + d_list_del(&dcrc->dcrc_batched_link); if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; @@ -203,6 +216,7 @@ dtx_cos_rec_free(struct btr_instance *tins, struct btr_record *rec, void *args) dcrc_lo_link) { d_list_del(&dcrc->dcrc_lo_link); d_list_del(&dcrc->dcrc_gl_committable); + d_list_del(&dcrc->dcrc_batched_link); if (dcrc->dcrc_coll) { dtx_coll_entry_put(dcrc->dcrc_dce); coll++; @@ -256,6 +270,8 @@ dtx_cos_rec_update(struct btr_instance *tins, struct btr_record *rec, if (dcrc == NULL) return -DER_NOMEM; + D_INIT_LIST_HEAD(&dcrc->dcrc_batched_link); + dcrc->dcrc_ready_time = daos_getmtime_coarse(); dcrc->dcrc_epoch = rbund->epoch; dcrc->dcrc_ptr = dcr; if (rbund->flags & DCF_COLL) { @@ -271,12 +287,15 @@ dtx_cos_rec_update(struct btr_instance *tins, struct btr_record *rec, d_tm_inc_gauge(tls->dt_committable, 1); if (rbund->flags & DCF_EXP_CMT) { + dcrc->dcrc_expcmt = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_expcmt_list); dcr->dcr_expcmt_count++; } else if (rbund->flags & DCF_SHARED) { + dcrc->dcrc_prio = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); dcr->dcr_prio_count++; } else { + dcrc->dcrc_reg = 1; d_list_add_tail(&dcrc->dcrc_lo_link, &dcr->dcr_reg_list); dcr->dcr_reg_count++; } @@ -294,6 +313,53 @@ btr_ops_t dtx_btr_cos_ops = { .to_rec_update = dtx_cos_rec_update, }; +static int +dtx_cos_del_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc) +{ + struct dtx_cos_key key; + d_iov_t kiov; + struct dtx_cos_rec *dcr = dcrc->dcrc_ptr; + uint64_t time = daos_getmtime_coarse() - dcrc->dcrc_ready_time; + int rc = 0; + + d_list_del(&dcrc->dcrc_gl_committable); + d_list_del(&dcrc->dcrc_lo_link); + if (!d_list_empty(&dcrc->dcrc_batched_link)) + d_list_del_init(&dcrc->dcrc_batched_link); + + if (dcrc->dcrc_expcmt) + dcr->dcr_expcmt_count--; + else if (dcrc->dcrc_prio) + dcr->dcr_prio_count--; + else + dcr->dcr_reg_count--; + + if (dcrc->dcrc_coll) + cont->sc_dtx_committable_coll_count--; + cont->sc_dtx_committable_count--; + + d_tm_set_gauge(dtx_tls_get()->dt_async_cmt_lat, time); + + if (dcr->dcr_reg_count == 0 && dcr->dcr_prio_count == 0 && dcr->dcr_expcmt_count == 0) { + key.oid = dcr->dcr_oid; + key.dkey_hash = dcr->dcr_dkey_hash; + d_iov_set(&kiov, &key, sizeof(key)); + rc = dbtree_delete(cont->sc_dtx_cos_hdl, BTR_PROBE_EQ, &kiov, NULL); + } + + DL_CDEBUG(rc != 0, DLOG_ERR, DB_IO, rc, + "Remove DTX "DF_DTI" from CoS cache", DP_DTI(&dcrc->dcrc_dte->dte_xid)); + + if (dcrc->dcrc_coll) + dtx_coll_entry_put(dcrc->dcrc_dce); + else + dtx_entry_put(dcrc->dcrc_dte); + + D_FREE(dcrc); + + return rc; +} + int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, @@ -306,18 +372,45 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, uint32_t count; uint32_t i = 0; + /* Last batched commit failed, let's re-commit them. */ + if (dcks == NULL && !d_list_empty(&cont->sc_dtx_batched_list)) { + dcrc = d_list_entry(cont->sc_dtx_batched_list.next, struct dtx_cos_rec_child, + dcrc_batched_link); + if (unlikely(dcrc->dcrc_coll)) { + *p_dce = dtx_coll_entry_get(dcrc->dcrc_dce); + return 1; + } + + D_ALLOC_ARRAY(dte_buf, max_cnt); + if (dte_buf == NULL) + return -DER_NOMEM; + + d_list_for_each_entry(dcrc, &cont->sc_dtx_batched_list, dcrc_batched_link) { + D_ASSERT(i < max_cnt); + dte_buf[i++] = dtx_entry_get(dcrc->dcrc_dte); + } + + *dtes = dte_buf; + return i; + } + /* Process collective DXT with higher priority. */ if (!d_list_empty(&cont->sc_dtx_coll_list) && oid == NULL) { d_list_for_each_entry(dcrc, &cont->sc_dtx_coll_list, dcrc_gl_committable) { if (epoch >= dcrc->dcrc_epoch && (dcrc->dcrc_piggyback_refs == 0 || force)) { - D_ALLOC_PTR(dck_buf); - if (dck_buf == NULL) - return -DER_NOMEM; - - dck_buf->oid = dcrc->dcrc_ptr->dcr_oid; - dck_buf->dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; - *dcks = dck_buf; + if (dcks != NULL) { + D_ALLOC_PTR(dck_buf); + if (dck_buf == NULL) + return -DER_NOMEM; + + dck_buf->oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf->dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + *dcks = dck_buf; + } else { + d_list_add_tail(&dcrc->dcrc_batched_link, + &cont->sc_dtx_batched_list); + } *p_dce = dtx_coll_entry_get(dcrc->dcrc_dce); return 1; @@ -326,19 +419,19 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, } count = min(cont->sc_dtx_committable_count, max_cnt); - if (count == 0) { - *dtes = NULL; + if (count == 0) return 0; - } D_ALLOC_ARRAY(dte_buf, count); if (dte_buf == NULL) return -DER_NOMEM; - D_ALLOC_ARRAY(dck_buf, count); - if (dck_buf == NULL) { - D_FREE(dte_buf); - return -DER_NOMEM; + if (dcks != NULL) { + D_ALLOC_ARRAY(dck_buf, count); + if (dck_buf == NULL) { + D_FREE(dte_buf); + return -DER_NOMEM; + } } d_list_for_each_entry(dcrc, &cont->sc_dtx_cos_list, dcrc_gl_committable) { @@ -353,17 +446,26 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, continue; D_FREE(dte_buf); - dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; - dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; - *dcks = dck_buf; + if (dcks != NULL) { + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + *dcks = dck_buf; + } else { + d_list_add_tail(&dcrc->dcrc_batched_link, + &cont->sc_dtx_batched_list); + } *p_dce = dtx_coll_entry_get(dcrc->dcrc_dce); return 1; } dte_buf[i] = dtx_entry_get(dcrc->dcrc_dte); - dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; - dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + if (dcks != NULL) { + dck_buf[i].oid = dcrc->dcrc_ptr->dcr_oid; + dck_buf[i].dkey_hash = dcrc->dcrc_ptr->dcr_dkey_hash; + } else { + d_list_add_tail(&dcrc->dcrc_batched_link, &cont->sc_dtx_batched_list); + } if (++i >= count) break; @@ -372,10 +474,10 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, if (i == 0) { D_FREE(dte_buf); D_FREE(dck_buf); - *dtes = NULL; } else { *dtes = dte_buf; - *dcks = dck_buf; + if (dcks != NULL) + *dcks = dck_buf; } return i; @@ -436,32 +538,44 @@ dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, } void -dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, - daos_unit_oid_t *oid, uint64_t dkey_hash) +dtx_cos_put_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, uint64_t dkey_hash, + struct dtx_id xid[], uint32_t count, bool rm) { struct dtx_cos_key key; d_iov_t kiov; d_iov_t riov; struct dtx_cos_rec *dcr; struct dtx_cos_rec_child *dcrc; + int del = 0; int rc; + int i; key.oid = *oid; key.dkey_hash = dkey_hash; d_iov_set(&kiov, &key, sizeof(key)); d_iov_set(&riov, NULL, 0); - /* It is normal that the DTX entry (to be put) in CoS has already been removed by race. */ - rc = dbtree_lookup(cont->sc_dtx_cos_hdl, &kiov, &riov); if (rc == 0) { dcr = (struct dtx_cos_rec *)riov.iov_buf; - d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { - if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { - dcrc->dcrc_piggyback_refs--; - return; + for (i = 0; i < count; i++) { + d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { + if (memcmp(&dcrc->dcrc_dte->dte_xid, &xid[i], + sizeof(struct dtx_id)) == 0) { + if (rm) { + rc = dtx_cos_del_one(cont, dcrc); + if (rc == 0) + del++; + } else { + dcrc->dcrc_piggyback_refs--; + } + break; + } } } + + if (del > 0) + d_tm_dec_gauge(dtx_tls_get()->dt_committable, del); } } @@ -493,12 +607,12 @@ dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, DAOS_INTENT_UPDATE, &kiov, &riov, NULL); if (flags & DCF_COLL) - D_CDEBUG(rc != 0, DLOG_ERR, DB_IO, "Insert coll DTX "DF_DTI" to CoS cache, " + D_CDEBUG(rc != 0, DLOG_ERR, DB_TRACE, "Insert coll DTX "DF_DTI" to CoS cache, " DF_UOID", key %lu, flags %x: "DF_RC"\n", DP_DTI(&((struct dtx_coll_entry *)entry)->dce_xid), DP_UOID(*oid), (unsigned long)dkey_hash, flags, DP_RC(rc)); else - D_CDEBUG(rc != 0, DLOG_ERR, DB_IO, "Insert reg DTX "DF_DTI" to CoS cache, " + D_CDEBUG(rc != 0, DLOG_ERR, DB_TRACE, "Insert reg DTX "DF_DTI" to CoS cache, " DF_UOID", key %lu, flags %x: "DF_RC"\n", DP_DTI(&((struct dtx_entry *)entry)->dte_xid), DP_UOID(*oid), (unsigned long)dkey_hash, flags, DP_RC(rc)); @@ -530,82 +644,36 @@ dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, dcr = (struct dtx_cos_rec *)riov.iov_buf; d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { - if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) != 0) - continue; - - d_list_del(&dcrc->dcrc_gl_committable); - d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_coll) { - dtx_coll_entry_put(dcrc->dcrc_dce); - cont->sc_dtx_committable_coll_count--; - } else { - dtx_entry_put(dcrc->dcrc_dte); + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + rc = dtx_cos_del_one(cont, dcrc); + D_GOTO(out, found = 1); } - D_FREE(dcrc); - - cont->sc_dtx_committable_count--; - dcr->dcr_prio_count--; - - D_GOTO(out, found = 1); } d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) { - if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) != 0) - continue; - - d_list_del(&dcrc->dcrc_gl_committable); - d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_coll) { - dtx_coll_entry_put(dcrc->dcrc_dce); - cont->sc_dtx_committable_coll_count--; - } else { - dtx_entry_put(dcrc->dcrc_dte); + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + rc = dtx_cos_del_one(cont, dcrc); + D_GOTO(out, found = 2); } - D_FREE(dcrc); - - cont->sc_dtx_committable_count--; - dcr->dcr_reg_count--; - - D_GOTO(out, found = 2); } d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) { - if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) != 0) - continue; - - d_list_del(&dcrc->dcrc_gl_committable); - d_list_del(&dcrc->dcrc_lo_link); - if (dcrc->dcrc_coll) { - dtx_coll_entry_put(dcrc->dcrc_dce); - cont->sc_dtx_committable_coll_count--; - } else { - dtx_entry_put(dcrc->dcrc_dte); + if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + rc = dtx_cos_del_one(cont, dcrc); + D_GOTO(out, found = 3); } - D_FREE(dcrc); - - cont->sc_dtx_committable_count--; - dcr->dcr_expcmt_count--; - - D_GOTO(out, found = 3); } out: - if (found > 0) { + if (found > 0) d_tm_dec_gauge(dtx_tls_get()->dt_committable, 1); - if (dcr->dcr_reg_count == 0 && dcr->dcr_prio_count == 0 && - dcr->dcr_expcmt_count == 0) - rc = dbtree_delete(cont->sc_dtx_cos_hdl, BTR_PROBE_EQ, &kiov, NULL); - } - if (rc == 0 && found == 0) rc = -DER_NONEXIST; - D_CDEBUG(rc != 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_IO, - "Remove DTX "DF_DTI" from CoS " - "cache, "DF_UOID", key %lu, %s shared entry: rc = "DF_RC"\n", - DP_DTI(xid), DP_UOID(*oid), (unsigned long)dkey_hash, - found == 1 ? "has" : "has not", DP_RC(rc)); + DL_CDEBUG(rc != 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, rc, + "Remove DTX from CoS cache "DF_UOID", key %lu", + DP_UOID(*oid), (unsigned long)dkey_hash); return rc == -DER_NONEXIST ? 0 : rc; } @@ -624,6 +692,12 @@ dtx_cos_oldest(struct ds_cont_child *cont) return dcrc->dcrc_epoch; } +/* + * It is inefficient to search some item on a very long list. So let's skip + * the search if the length exceeds DTX_COS_SEARCH_MAX. That is not fatal. + */ +#define DTX_COS_SEARCH_MAX 32 + void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash) @@ -647,8 +721,13 @@ dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, dcr = (struct dtx_cos_rec *)riov.iov_buf; + if (dcr->dcr_reg_count > DTX_COS_SEARCH_MAX) + goto expcmt; + d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) { if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { + dcrc->dcrc_reg = 0; + dcrc->dcrc_prio = 1; d_list_del(&dcrc->dcrc_lo_link); d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); dcr->dcr_reg_count--; @@ -658,14 +737,9 @@ dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, } } - d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { - if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { - d_list_del(&dcrc->dcrc_lo_link); - d_list_add(&dcrc->dcrc_lo_link, &dcr->dcr_prio_list); - - D_GOTO(out, found = true); - } - } +expcmt: + if (dcr->dcr_expcmt_count > DTX_COS_SEARCH_MAX) + goto out; d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) { if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) @@ -683,3 +757,39 @@ dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, /* It is normal that the DTX entry (for priority) in CoS has been committed by race. */ } + +void +dtx_cos_batched_del(struct ds_cont_child *cont, struct dtx_id xid[], bool rm[], uint32_t count) +{ + struct dtx_cos_rec_child *dcrc; + int del = 0; + int rc; + int i = 0; + bool found; + + while ((dcrc = d_list_pop_entry(&cont->sc_dtx_batched_list, struct dtx_cos_rec_child, + dcrc_batched_link)) != NULL) { + for (found = false; i < count && !found; i++) { + /* + * Some entries in the sc_dtx_batched_list may have been committed by + * others by race. Since the entries order in the sc_dtx_batched_list + * will not be changed, let's compare with xid[i] via one cycle scan. + */ + if (memcmp(&dcrc->dcrc_dte->dte_xid, &xid[i], sizeof(struct dtx_id)) == 0) { + found = true; + + if (rm[i]) { + rc = dtx_cos_del_one(cont, dcrc); + if (rc == 0) + del++; + } + } + } + + /* There must be one in xid array that matches current dcrc. */ + D_ASSERT(found); + } + + if (del > 0) + d_tm_dec_gauge(dtx_tls_get()->dt_committable, del); +} diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index a42bcc1d7d6..06d0333dd77 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -213,6 +213,7 @@ struct dtx_pool_metrics { struct dtx_tls { struct d_tm_node_t *dt_committable; struct d_tm_node_t *dt_dtx_leader_total; + struct d_tm_node_t *dt_async_cmt_lat; uint64_t dt_agg_gen; uint32_t dt_batched_ult_cnt; }; @@ -263,6 +264,9 @@ uint64_t dtx_cos_oldest(struct ds_cont_child *cont); void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash); +void dtx_cos_batched_del(struct ds_cont_child *cont, struct dtx_id xid[], bool rm[], + uint32_t count); + /* dtx_rpc.c */ int dtx_check(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch); diff --git a/src/dtx/dtx_resync.c b/src/dtx/dtx_resync.c index b98a6469954..756dfe0ca44 100644 --- a/src/dtx/dtx_resync.c +++ b/src/dtx/dtx_resync.c @@ -115,7 +115,7 @@ dtx_resync_commit(struct ds_cont_child *cont, } if (j > 0) { - rc = dtx_commit(cont, dtes, dcks, j); + rc = dtx_commit(cont, dtes, dcks, j, true); if (rc < 0) D_ERROR("Failed to commit the DTXs: rc = "DF_RC"\n", DP_RC(rc)); @@ -359,7 +359,7 @@ dtx_status_handle_one(struct ds_cont_child *cont, struct dtx_entry *dte, daos_un dck.oid = oid; dck.dkey_hash = dkey_hash; - rc = dtx_coll_commit(cont, dce, &dck); + rc = dtx_coll_commit(cont, dce, &dck, true); } dtx_coll_entry_put(dce); diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index e654047a621..5423f6c0cc8 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -225,9 +225,10 @@ dtx_req_cb(const struct crt_cb_info *cb_info) } out: - D_DEBUG(DB_TRACE, "DTX req for opc %x (req %p future %p) got reply from %d/%d: " - "epoch :"DF_X64", result %d\n", dra->dra_opc, req, dra->dra_future, - drr->drr_rank, drr->drr_tag, din != NULL ? din->di_epoch : 0, rc); + DL_CDEBUG(rc < 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, rc, + "DTX req for opc %x (req %p future %p) got reply from %d/%d: " + "epoch :"DF_X64, dra->dra_opc, req, dra->dra_future, + drr->drr_rank, drr->drr_tag, din != NULL ? din->di_epoch : 0); drr->drr_comp = 1; drr->drr_result = rc; @@ -397,19 +398,14 @@ dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance) if (unlikely(dra->dra_opc == DTX_COMMIT && dca->dca_i == 0 && DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT))) - rc = dtx_req_send(dca->dca_drr, 1); + dtx_req_send(dca->dca_drr, 1); else - rc = dtx_req_send(dca->dca_drr, dca->dca_epoch); - if (rc != 0) { - /* If the first sub-RPC failed, then break, otherwise - * other remote replicas may have already received the - * RPC and executed it, so have to go ahead. - */ - if (dca->dca_i == 0) { - ABT_future_free(&dra->dra_future); - return rc; - } - } + dtx_req_send(dca->dca_drr, dca->dca_epoch); + /* + * Do not care dtx_req_send result, itself or its cb func will set dra->dra_future. + * Each RPC is independent from the others, let's go head to handle the other RPCs + * and set dra->dra_future that will avoid blocking the RPC sponsor - dtx_req_wait. + */ /* dca->dca_drr maybe not points to a real entry if all RPCs have been sent. */ dca->dca_drr = d_list_entry(dca->dca_drr->drr_link.next, @@ -616,12 +612,8 @@ dtx_rpc_helper(struct dss_chore *chore, bool is_reentrance) rc = dtx_req_list_send(dca, is_reentrance); if (rc == DSS_CHORE_YIELD) return DSS_CHORE_YIELD; - if (rc == DSS_CHORE_DONE) - rc = 0; - if (rc != 0) - dca->dca_dra.dra_result = rc; - D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, "%p: DTX RPC chore for %u done: %d\n", chore, - dca->dca_dra.dra_opc, rc); + D_ASSERTF(rc == DSS_CHORE_DONE, "Unexpected helper return value for RPC %u: %d\n", + dca->dca_dra.dra_opc, rc); if (dca->dca_chore_eventual != ABT_EVENTUAL_NULL) { rc = ABT_eventual_set(dca->dca_chore_eventual, NULL, 0); D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_set: %d\n", rc); @@ -737,8 +729,10 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, } rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); - if (rc != 0) + if (rc != 0) { + ABT_eventual_free(&dca->dca_chore_eventual); goto out; + } rc = ABT_eventual_wait(dca->dca_chore_eventual, NULL); D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_wait: %d\n", rc); @@ -809,7 +803,7 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, */ int dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, - struct dtx_cos_key *dcks, int count) + struct dtx_cos_key *dcks, int count, bool has_cos) { struct dtx_common_args dca; struct dtx_req_args *dra = &dca.dca_dra; @@ -842,7 +836,7 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, rc1 = vos_dtx_set_flags(cont->sc_hdl, dca.dca_dtis, count, DTE_PARTIAL_COMMITTED); } else { - if (dcks != NULL) { + if (has_cos) { if (count > 1) { D_ALLOC_ARRAY(rm_cos, count); if (rm_cos == NULL) @@ -862,12 +856,16 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, } if (rc1 == 0 && rm_cos != NULL) { - for (i = 0; i < count; i++) { - if (rm_cos[i]) { - D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub)); - dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, - dcks[i].dkey_hash); + if (dcks != NULL) { + for (i = 0; i < count; i++) { + if (rm_cos[i]) { + D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub)); + dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, + dcks[i].dkey_hash); + } } + } else { + dtx_cos_batched_del(cont, dca.dca_dtis, rm_cos, count); } } @@ -1237,7 +1235,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che case DTX_ST_COMMITTABLE: dck.oid = dsp->dsp_oid; dck.dkey_hash = dsp->dsp_dkey_hash; - rc = dtx_commit(cont, &pdte, &dck, 1); + rc = dtx_commit(cont, &pdte, &dck, 1, true); if (rc < 0 && rc != -DER_NONEXIST && for_io) d_list_add_tail(&dsp->dsp_link, cmt_list); else @@ -1258,7 +1256,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che case DSHR_NEED_COMMIT: { dck.oid = dsp->dsp_oid; dck.dkey_hash = dsp->dsp_dkey_hash; - rc = dtx_commit(cont, &pdte, &dck, 1); + rc = dtx_commit(cont, &pdte, &dck, 1, true); if (rc < 0 && rc != -DER_NONEXIST && for_io) d_list_add_tail(&dsp->dsp_link, cmt_list); else @@ -1571,17 +1569,20 @@ dtx_coll_rpc_post(struct dtx_coll_rpc_args *dcra, int ret) { int rc; - rc = ABT_future_wait(dcra->dcra_future); - D_CDEBUG(rc != ABT_SUCCESS, DLOG_ERR, DB_TRACE, - "Collective DTX wait req for opc %u, future %p done, rc %d, result %d\n", - dcra->dcra_opc, dcra->dcra_future, rc, dcra->dcra_result); - ABT_future_free(&dcra->dcra_future); + if (dcra->dcra_future != ABT_FUTURE_NULL) { + rc = ABT_future_wait(dcra->dcra_future); + D_CDEBUG(rc != ABT_SUCCESS, DLOG_ERR, DB_TRACE, + "Collective DTX wait req for opc %u, future %p done, rc %d, result %d\n", + dcra->dcra_opc, dcra->dcra_future, rc, dcra->dcra_result); + ABT_future_free(&dcra->dcra_future); + } return ret != 0 ? ret : dcra->dcra_result; } int -dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct dtx_cos_key *dck) +dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct dtx_cos_key *dck, + bool has_cos) { struct dtx_coll_rpc_args dcra = { 0 }; int *results = NULL; @@ -1591,11 +1592,22 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d int rc1 = 0; int rc2 = 0; int i; + bool cos = true; if (dce->dce_ranks != NULL) rc = dtx_coll_rpc_prep(cont, dce, DTX_COLL_COMMIT, 0, &dcra); + /* + * NOTE: Before committing the DTX on remote participants, we cannot remove the active + * DTX locally; otherwise, the local committed DTX entry may be removed via DTX + * aggregation before remote participants commit done. Under such case, if some + * remote DTX participant triggere DTX_REFRESH for such DTX during the interval, + * then it will get -DER_TX_UNCERTAIN, that may cause related application to be + * failed. So here, we let remote participants to commit firstly, if failed, we + * will ask the leader to retry the commit until all participants got committed. + */ if (dce->dce_bitmap != NULL) { + clrbit(dce->dce_bitmap, dss_get_module_info()->dmi_tgt_id); len = dtx_coll_local_exec(cont->sc_pool_uuid, cont->sc_uuid, &dce->dce_xid, 0, DTX_COLL_COMMIT, dce->dce_bitmap_sz, dce->dce_bitmap, &results); @@ -1634,8 +1646,12 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d * to remove the collective DTX entry from the CoS even if the commit failed remotely. * Otherwise, the batched commit ULT may be blocked by such "bad" entry. */ - if (rc2 == 0 && dck != NULL) - dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); + if (rc2 == 0 && has_cos) { + if (dck != NULL) + dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); + else + dtx_cos_batched_del(cont, &dce->dce_xid, &cos, 1); + } D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, "Collectively commit DTX "DF_DTI": %d/%d/%d\n", @@ -1658,7 +1674,17 @@ dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc if (dce->dce_ranks != NULL) rc = dtx_coll_rpc_prep(cont, dce, DTX_COLL_ABORT, epoch, &dcra); + /* + * NOTE: The DTX abort maybe triggered by dtx_leader_end() for timeout on some DTX + * participant(s). Under such case, the client side RPC sponsor may also hit + * the RPC timeout and resends related RPC to the leader. Here, to avoid DTX + * abort and resend RPC forwarding being executed in parallel, we will abort + * local DTX after remote done, before that the logic of handling resent RPC + * on server will find the local pinned DTX entry then notify related client + * to resend sometime later. + */ if (dce->dce_bitmap != NULL) { + clrbit(dce->dce_bitmap, dss_get_module_info()->dmi_tgt_id); len = dtx_coll_local_exec(cont->sc_pool_uuid, cont->sc_uuid, &dce->dce_xid, epoch, DTX_COLL_ABORT, dce->dce_bitmap_sz, dce->dce_bitmap, &results); diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 93cc6744a2d..9e78100c451 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -34,8 +34,8 @@ dtx_tls_init(int tags, int xs_id, int tgt_id) tls->dt_agg_gen = 1; rc = d_tm_add_metric(&tls->dt_committable, D_TM_STATS_GAUGE, - "total number of committable DTX entries", - "entries", "io/dtx/committable/tgt_%u", tgt_id); + "total number of committable DTX entries", "entry", + "io/dtx/committable/tgt_%u", tgt_id); if (rc != DER_SUCCESS) D_WARN("Failed to create DTX committable metric: " DF_RC"\n", DP_RC(rc)); @@ -48,6 +48,13 @@ dtx_tls_init(int tags, int xs_id, int tgt_id) D_WARN("Failed to create DTX leader metric: " DF_RC"\n", DP_RC(rc)); + rc = d_tm_add_metric(&tls->dt_async_cmt_lat, D_TM_STATS_GAUGE, + "DTX async commit latency", "ms", + "io/dtx/async_cmt_lat/tgt_%u", tgt_id); + if (rc != DER_SUCCESS) + D_WARN("Failed to create DTX async commit latency metric: " DF_RC"\n", + DP_RC(rc)); + return tls; } @@ -117,7 +124,7 @@ dtx_metrics_alloc(const char *path, int tgt_id) rc = d_tm_add_metric(&metrics->dpm_total[DTX_PROTO_SRV_RPC_COUNT], D_TM_COUNTER, "total number of processed sync DTX_COMMIT", "ops", - "%s/ops/sync_dtx_commit/tgt_%u", path, tgt_id); + "%s/ops/dtx_sync_commit/tgt_%u", path, tgt_id); if (rc != DER_SUCCESS) D_WARN("Failed to create sync DTX_COMMIT RPC cnt metric: "DF_RC"\n", DP_RC(rc)); diff --git a/src/engine/ult.c b/src/engine/ult.c index 94a2a0f9390..fbeb3f538fa 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -458,7 +458,7 @@ sched_ult2xs(int xs_type, int tgt_id) break; case DSS_XS_OFFLOAD: if (dss_numa_nr > 1) - xs_id = sched_ult2xs_multisocket(xs_type, tgt_id); + return sched_ult2xs_multisocket(xs_type, tgt_id); if (!dss_helper_pool) { if (dss_tgt_offload_xs_nr > 0) xs_id = DSS_MAIN_XS_ID(tgt_id) + dss_tgt_offload_xs_nr / dss_tgt_nr; diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 793d585c07a..f99f4df14e3 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -134,6 +134,8 @@ struct ds_cont_child { d_list_t sc_dtx_cos_list; /* The global list for committable collective DTXs. */ d_list_t sc_dtx_coll_list; + /* The list for current DTX batched commit. */ + d_list_t sc_dtx_batched_list; /* the pool map version of updating DAOS_PROP_CO_STATUS prop */ uint32_t sc_status_pm_ver; }; diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index 7b3f7e9ee57..7c60d2deaa0 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -318,8 +318,8 @@ int dtx_cos_get_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, uint64_t dkey_hash, int max, struct dtx_id **dtis); void -dtx_cos_put_piggyback(struct ds_cont_child *cont, struct dtx_id *xid, - daos_unit_oid_t *oid, uint64_t dkey_hash); +dtx_cos_put_piggyback(struct ds_cont_child *cont, daos_unit_oid_t *oid, uint64_t dkey_hash, + struct dtx_id xid[], uint32_t count, bool rm); int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg); @@ -338,14 +338,15 @@ int dtx_obj_sync(struct ds_cont_child *cont, daos_unit_oid_t *oid, daos_epoch_t epoch); int dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, - struct dtx_cos_key *dcks, int count); + struct dtx_cos_key *dcks, int count, bool has_cos); int dtx_abort(struct ds_cont_child *cont, struct dtx_entry *dte, daos_epoch_t epoch); int dtx_refresh(struct dtx_handle *dth, struct ds_cont_child *cont); int -dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct dtx_cos_key *dck); +dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct dtx_cos_key *dck, + bool has_cos); int dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoch_t epoch); diff --git a/src/tests/ftest/util/telemetry_utils.py b/src/tests/ftest/util/telemetry_utils.py index db424b6de68..17f1753b100 100644 --- a/src/tests/ftest/util/telemetry_utils.py +++ b/src/tests/ftest/util/telemetry_utils.py @@ -90,6 +90,7 @@ class TelemetryUtils(): "engine_pool_ops_dtx_coll_commit", "engine_pool_ops_dtx_commit", "engine_pool_ops_dtx_refresh", + "engine_pool_ops_dtx_sync_commit", "engine_pool_ops_ec_agg", "engine_pool_ops_ec_rep", "engine_pool_ops_fetch", @@ -200,6 +201,8 @@ class TelemetryUtils(): "engine_dmabuff_queued_reqs", "engine_dmabuff_grab_errs", *_gen_stats_metrics("engine_dmabuff_grab_retries")] + ENGINE_IO_DTX_ASYNC_CMT_LAT_METRICS = \ + _gen_stats_metrics("engine_io_dtx_async_cmt_lat") ENGINE_IO_DTX_COMMITTABLE_METRICS = \ _gen_stats_metrics("engine_io_dtx_committable") ENGINE_IO_DTX_COMMITTED_METRICS = \ @@ -304,7 +307,8 @@ class TelemetryUtils(): _gen_stats_metrics("engine_io_ops_tgt_update_active") ENGINE_IO_OPS_UPDATE_ACTIVE_METRICS = \ _gen_stats_metrics("engine_io_ops_update_active") - ENGINE_IO_METRICS = ENGINE_IO_DTX_COMMITTABLE_METRICS +\ + ENGINE_IO_METRICS = ENGINE_IO_DTX_ASYNC_CMT_LAT_METRICS +\ + ENGINE_IO_DTX_COMMITTABLE_METRICS +\ ENGINE_IO_DTX_COMMITTED_METRICS +\ ENGINE_IO_LATENCY_FETCH_METRICS +\ ENGINE_IO_LATENCY_BULK_FETCH_METRICS +\