Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16721 object: fix coll RPC for obj with sparse layout #15375

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ dtx_coll_prep_ult(void *arg)
DP_UUID(cont->sc_uuid), DP_RC(rc));
}

if (dcpa->dcpa_result != 0)
if (dcpa->dcpa_result != 0) {
if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST)
D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n",
DP_DTI(&dci->dci_xid), opc, DP_RC(rc));
goto out;
}

dcpa->dcpa_result = dtx_coll_prep(dci->dci_po_uuid, dcpa->dcpa_oid, &dci->dci_xid, mbs, -1,
dci->dci_version, cont->sc_pool->spc_map_version,
Expand Down
5 changes: 4 additions & 1 deletion src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -974,8 +974,11 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che
if (dsp->dsp_mbs == NULL) {
rc = vos_dtx_load_mbs(cont->sc_hdl, &dsp->dsp_xid, NULL, &dsp->dsp_mbs);
if (rc != 0) {
if (rc < 0 && rc != -DER_NONEXIST && for_io)
if (rc < 0 && rc != -DER_NONEXIST && for_io) {
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n",
DP_DTI(&dsp->dsp_xid), DP_RC(rc));
goto out;
}

drop = true;
goto next;
Expand Down
111 changes: 73 additions & 38 deletions src/object/cli_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ obj_coll_oper_args_init(struct coll_oper_args *coa, struct dc_object *obj, bool
if (coa->coa_sparse == 0)
coa->coa_dct_cap = obj_ranks;
}

/* Save obj->cob_min_rank for verification during subsequent obj_coll_prep_one(). */
coa->coa_min_rank = obj->cob_min_rank;

D_RWLOCK_UNLOCK(&obj->cob_lock);

if (coa->coa_sparse) {
Expand Down Expand Up @@ -208,7 +212,6 @@ obj_coll_oper_args_init(struct coll_oper_args *coa, struct dc_object *obj, bool
coa->coa_dct_nr = -1;
}

coa->coa_max_dct_sz = 0;
coa->coa_max_shard_nr = 0;
coa->coa_max_bitmap_sz = 0;
coa->coa_target_nr = 0;
Expand Down Expand Up @@ -236,17 +239,55 @@ obj_coll_oper_args_fini(struct coll_oper_args *coa)
coa->coa_dct_nr = 0;
}

static void
obj_coll_collapse_one(struct coll_oper_args *coa, struct daos_coll_target *dct,
uint32_t *size, bool copy)
{
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;

/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (i = 0; i <= dct->dct_max_shard; i++) {
dcs = &dct->dct_shards[i];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (copy)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
}

struct obj_coll_tree_args {
struct coll_oper_args *coa;
uint32_t *size;
};

static int
obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
{
struct coll_oper_args *coa = arg;
struct daos_coll_target *dct = val->iov_buf;
struct obj_coll_tree_args *octa = arg;
struct coll_oper_args *coa = octa->coa;
struct daos_coll_target *dct = val->iov_buf;

D_ASSERTF(coa->coa_dct_nr < coa->coa_dct_cap,
"Too short pre-allcoated dct_array: %u vs %u\n",
coa->coa_dct_nr, coa->coa_dct_cap);
D_ASSERT(dct->dct_bitmap != NULL);

memcpy(&coa->coa_dcts[coa->coa_dct_nr++], dct, sizeof(*dct));
obj_coll_collapse_one(coa, dct, octa->size, true);

/* The following members have been migrated into coa->coa_dcts. */
dct->dct_bitmap = NULL;
Expand All @@ -259,6 +300,7 @@ obj_coll_tree_cb(daos_handle_t ih, d_iov_t *key, d_iov_t *val, void *arg)
static int
obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
{
struct obj_coll_tree_args octa;
struct coll_sparse_targets *tree = coa->coa_tree;
int rc = 0;

Expand All @@ -270,7 +312,14 @@ obj_coll_collapse_tree(struct coll_oper_args *coa, uint32_t *size)
D_GOTO(out, rc = -DER_NOMEM);

coa->coa_sparse = 0;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false, obj_coll_tree_cb, coa);
coa->coa_raw_sparse = 1;
coa->coa_dct_nr = 0;
coa->coa_max_dct_sz = 0;

octa.coa = coa;
octa.size = size;
rc = dbtree_iterate(tree->cst_tree_hdl, DAOS_INTENT_DEFAULT, false,
obj_coll_tree_cb, &octa);
if (rc == 0)
D_ASSERTF(coa->coa_dct_nr == coa->coa_dct_cap,
"Something is wrong when prepare coll target array: %u vs %u\n",
Expand All @@ -287,36 +336,13 @@ static int
obj_coll_collapse_array(struct coll_oper_args *coa, uint32_t *size)
{
struct daos_coll_target *dct;
struct daos_coll_shard *dcs;
uint32_t dct_size;
int i;
int j;

for (i = 0, *size = 0, coa->coa_dct_nr = 0; i < coa->coa_dct_cap; i++) {
for (i = 0, *size = 0, coa->coa_dct_nr = 0, coa->coa_max_dct_sz = 0;
i < coa->coa_dct_cap; i++) {
dct = &coa->coa_dcts[i];
if (dct->dct_bitmap != NULL) {
/* The size may be over estimated, no matter. */
dct_size = sizeof(*dct) + dct->dct_bitmap_sz +
sizeof(dct->dct_shards[0]) * (dct->dct_max_shard + 1);

for (j = 0; j <= dct->dct_max_shard; j++) {
dcs = &dct->dct_shards[j];
if (dcs->dcs_nr > 1)
dct_size += sizeof(dcs->dcs_buf[0]) * dcs->dcs_nr;
}

if (coa->coa_for_modify)
dct_size += sizeof(dct->dct_tgt_ids[0]) * dct->dct_tgt_nr;

if (coa->coa_max_dct_sz < dct_size)
coa->coa_max_dct_sz = dct_size;

if (coa->coa_dct_nr < i)
memcpy(&coa->coa_dcts[coa->coa_dct_nr], dct, sizeof(*dct));

coa->coa_dct_nr++;
*size += dct_size;
}
if (dct->dct_bitmap != NULL)
obj_coll_collapse_one(coa, dct, size, coa->coa_dct_nr < i);
}

/* Reset the other dct slots to avoid double free during cleanup. */
Expand Down Expand Up @@ -373,8 +399,9 @@ obj_coll_prep_one(struct coll_oper_args *coa, struct dc_object *obj,

D_RWLOCK_RDLOCK(&obj->cob_lock);

D_ASSERTF(shard->do_target_rank <= obj->cob_max_rank,
"Unexpected shard with rank %u > %u\n", shard->do_target_rank, obj->cob_max_rank);
D_ASSERTF(coa->coa_min_rank == obj->cob_min_rank,
"Object "DF_OID" layout has been changed unexpectedly %u => %u, idx %u, ver %u\n",
DP_OID(obj->cob_md.omd_id), coa->coa_min_rank, obj->cob_min_rank, idx, map_ver);
D_ASSERTF(shard->do_target_rank >= obj->cob_min_rank,
"Unexpected shard with rank %u < %u\n", shard->do_target_rank, obj->cob_min_rank);

Expand Down Expand Up @@ -669,7 +696,6 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
uint32_t tgt_size = 0;
uint32_t mbs_max_size;
uint32_t inline_size;
uint32_t flags = ORF_LEADER;
uint32_t leader = -1;
uint32_t len;
int rc;
Expand Down Expand Up @@ -746,6 +772,12 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
memcpy(dct, &tmp_tgt, sizeof(tmp_tgt));
}

/* 'shard' is on the leader target that is must be the coa->coa_dcts[0]. */
D_ASSERTF(shard->do_target_rank == coa->coa_dcts[0].dct_rank,
"Object "DF_OID" target array corrupted: rank %u vs %ur, nr %u\n",
DP_OID(obj->cob_md.omd_id), shard->do_target_rank,
coa->coa_dcts[0].dct_rank, coa->coa_dct_nr);

rc = dc_obj_coll_punch_mbs(coa, obj, shard->do_target_id, &mbs);
if (rc < 0)
goto out;
Expand All @@ -767,12 +799,14 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
if (rc != 0)
goto out;

auxi->flags = ORF_LEADER;
if (auxi->io_retry) {
flags |= ORF_RESEND;
auxi->flags |= ORF_RESEND;
/* Reset @enqueue_id if resend to new leader. */
if (spa->pa_auxi.target != shard->do_target_id)
spa->pa_auxi.enqueue_id = 0;
} else {
auxi->flags &= ~ORF_RESEND;
spa->pa_auxi.obj_auxi = auxi;
daos_dti_gen(&spa->pa_dti, false);
}
Expand All @@ -781,14 +815,15 @@ dc_obj_coll_punch(tse_task_t *task, struct dc_object *obj, struct dtx_epoch *epo
spa->pa_auxi.shard = shard->do_shard_idx;

if (obj_is_ec(obj))
flags |= ORF_EC;
auxi->flags |= ORF_EC;

mbs_max_size = sizeof(*mbs) + mbs->dm_data_size +
sizeof(coa->coa_targets[0]) * coa->coa_max_shard_nr + coa->coa_max_bitmap_sz;

return dc_obj_shard_coll_punch(shard, spa, mbs, mbs_max_size, cpca.cpca_bulks, tgt_size,
coa->coa_dcts, coa->coa_dct_nr, coa->coa_max_dct_sz, epoch,
args->flags, flags, map_ver, &auxi->map_ver_reply, task);
args->flags, auxi->flags, map_ver,
&auxi->map_ver_reply, task);

out:
if (rc > 0)
Expand Down
7 changes: 4 additions & 3 deletions src/object/cli_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -1453,9 +1453,10 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data)

DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result,
"DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX "
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x", rpc, DP_UOID(ocpi->ocpi_oid),
DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver, *cb_args->cpca_ver,
(unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags);
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout",
rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver,
*cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags,
cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous");

crt_req_decref(rpc);

Expand Down
8 changes: 7 additions & 1 deletion src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,16 @@ struct coll_oper_args {
struct shard_auxi_args coa_auxi;
int coa_dct_nr;
uint32_t coa_dct_cap;
uint32_t coa_max_dct_sz;
union {
/* Save obj->cob_min_rank for verification during obj_coll_prep_one(). */
uint32_t coa_min_rank;
/* Only can be used since obj_coll_oper_args_collapse() after object layout scan. */
uint32_t coa_max_dct_sz;
};
uint8_t coa_max_shard_nr;
uint8_t coa_max_bitmap_sz;
uint8_t coa_for_modify:1,
coa_raw_sparse:1,
coa_sparse:1;
uint8_t coa_target_nr;
/*
Expand Down
2 changes: 1 addition & 1 deletion src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t
* use the "cur_pos" as the relay engine.
*/
pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos;
if (pos != ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
memcpy(&tmp, &tgts[pos], sizeof(tmp));
memcpy(&tgts[pos], dct, sizeof(tmp));
memcpy(dct, &tmp, sizeof(tmp));
Expand Down
12 changes: 9 additions & 3 deletions src/object/srv_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,15 @@ obj_coll_punch_prep(struct obj_coll_punch_in *ocpi, struct daos_coll_target *dct
int i;
int j;

/* dcts[0] is for current engine. */
if (dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL) {
/* dcts[0] must be for current engine. */
if (unlikely(dcts[0].dct_rank != dss_self_rank())) {
D_ERROR("Invalid targets array: rank %u vs %u, nr %u, flags %x\n",
dcts[0].dct_rank, dss_self_rank(), dct_nr, ocpi->ocpi_flags);
D_GOTO(out, rc = -DER_INVAL);
}

if (unlikely(dcts[0].dct_bitmap == NULL || dcts[0].dct_bitmap_sz == 0 ||
dcts[0].dct_shards == NULL)) {
D_ERROR("Invalid input for current engine: bitmap %s, bitmap_sz %u, shards %s\n",
dcts[0].dct_bitmap == NULL ? "empty" : "non-empty", dcts[0].dct_bitmap_sz,
dcts[0].dct_shards == NULL ? "empty" : "non-empty");
Expand Down
8 changes: 4 additions & 4 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -5605,12 +5605,12 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)

D_DEBUG(DB_IO, "(%s) handling collective punch RPC %p for obj "
DF_UOID" on XS %u/%u epc "DF_X64" pmv %u, with dti "
DF_DTI", forward width %u, forward depth %u\n",
DF_DTI", forward width %u, forward depth %u, flags %x\n",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"),
rpc, DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id,
ocpi->ocpi_epoch, ocpi->ocpi_map_ver, DP_DTI(&ocpi->ocpi_xid),
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

D_ASSERT(dmi->dmi_xs_id != 0);

Expand Down Expand Up @@ -5747,13 +5747,13 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)
DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc,
"(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc "
DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, "
"tgt_nr %u, forward width %u, forward depth %u",
"tgt_nr %u, forward width %u, forward depth %u, flags %x",
(ocpi->ocpi_flags & ORF_LEADER) ? "leader" :
(ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc,
DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch,
ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz,
ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count,
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth);
ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags);

obj_punch_complete(rpc, rc, max_ver);

Expand Down
3 changes: 0 additions & 3 deletions src/vos/vos_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1972,9 +1972,6 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
rc = -DER_INPROGRESS;
}

if (rc < 0)
D_ERROR("Failed to load mbs for "DF_DTI": "DF_RC"\n", DP_DTI(dti), DP_RC(rc));

return rc;
}

Expand Down
Loading