diff --git a/src/object/srv_coll.c b/src/object/srv_coll.c index 9e4218108616..a63a11d574b7 100644 --- a/src/object/srv_coll.c +++ b/src/object/srv_coll.c @@ -183,7 +183,7 @@ obj_coll_punch_bulk(crt_rpc_t *rpc, d_iov_t *iov, crt_proc_t *p_proc, sgl.sg_iovs = iov; rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &ocpi->ocpi_tgt_bulk, NULL, NULL, - DAOS_HDL_INVAL, &sgls, 1, NULL, NULL); + DAOS_HDL_INVAL, &sgls, 1, 1, NULL, NULL); if (rc != 0) { D_ERROR("Failed to prepare bulk transfer for coll_punch, size %u: "DF_RC"\n", ocpi->ocpi_bulk_tgt_sz, DP_RC(rc)); diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 6f13e3f36dc2..16bde0287a21 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -279,8 +279,8 @@ void ds_obj_coll_punch_handler(crt_rpc_t *rpc); typedef int (*ds_iofw_cb_t)(crt_rpc_t *req, void *arg); int obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, - crt_bulk_t *remote_bulks, uint64_t *remote_offs, uint8_t *skips, - daos_handle_t ioh, d_sg_list_t **sgls, int sgl_nr, + crt_bulk_t *remote_bulks, uint64_t *remote_offs, uint64_t *skips, + daos_handle_t ioh, d_sg_list_t **sgls, int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh); int obj_tgt_punch(struct obj_tgt_punch_args *otpa, uint32_t *shards, uint32_t count); int obj_tgt_query(struct obj_tgt_query_args *otqa, uuid_t po_uuid, uuid_t co_hdl, uuid_t co_uuid, diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 0a246bebdca4..c2b82ae0bc76 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -487,23 +487,25 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, int obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bulk_t *remote_bulks, - uint64_t *remote_offs, uint8_t *skips, daos_handle_t ioh, d_sg_list_t **sgls, - int sgl_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh) + uint64_t *remote_offs, uint64_t *skips, daos_handle_t ioh, d_sg_list_t **sgls, + int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh) { - struct obj_rw_in *orw = crt_req_get(rpc); struct obj_bulk_args arg = { 0 }; int i, rc, *status, ret; int skip_nr = 0; - int bulk_nr; bool async = true; uint64_t time = daos_get_ntime(); + if (unlikely(sgl_nr > bulk_nr)) { + D_ERROR("Invalid sgl_nr vs bulk_nr: %d/%d\n", sgl_nr, bulk_nr); + return -DER_INVAL; + } + if (remote_bulks == NULL) { D_ERROR("No remote bulks provided\n"); return -DER_INVAL; } - bulk_nr = orw->orw_bulks.ca_count; if (p_arg == NULL) { p_arg = &arg; async = false; @@ -539,12 +541,12 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul * because we always create bulk handle if EC object IO targeted for multiple * data shards. */ - while (skips != NULL && isset(skips, i + skip_nr)) + while (skips != NULL && isset64(skips, i + skip_nr)) skip_nr++; - if (bulk_nr > 0) - D_ASSERTF(i + skip_nr < bulk_nr, "i %d, skip_nr %d, bulk_nr %d\n", - i, skip_nr, bulk_nr); + D_ASSERTF(i + skip_nr < bulk_nr, "i %d, skip_nr %d, sgl_nr %d, bulk_nr %d\n", + i, skip_nr, sgl_nr, bulk_nr); + if (remote_bulks[i + skip_nr] == NULL) continue; @@ -574,6 +576,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul break; } } + + D_ASSERTF(sgl_nr + skip_nr <= bulk_nr, "Unmatched sgl_nr %d, skip_nr %d, bulk_nr %d\n", + sgl_nr, skip_nr, bulk_nr); + done: if (--(p_arg->bulks_inflight) == 0) ABT_eventual_set(p_arg->eventual, &rc, sizeof(rc)); @@ -616,7 +622,7 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul } static int -obj_set_reply_sizes(crt_rpc_t *rpc, daos_iod_t *iods, int iod_nr, uint8_t *skips) +obj_set_reply_sizes(crt_rpc_t *rpc, daos_iod_t *iods, int iod_nr, uint64_t *skips) { struct obj_rw_in *orw = crt_req_get(rpc); struct obj_rw_out *orwo = crt_reply_get(rpc); @@ -651,7 +657,7 @@ obj_set_reply_sizes(crt_rpc_t *rpc, daos_iod_t *iods, int iod_nr, uint8_t *skips } for (i = 0, idx = 0; i < orw_iod_nr; i++) { - if (skips != NULL && isset(skips, i)) { + if (skips != NULL && isset64(skips, i)) { sizes[i] = 0; continue; } @@ -688,7 +694,7 @@ obj_set_reply_sizes(crt_rpc_t *rpc, daos_iod_t *iods, int iod_nr, uint8_t *skips * it will pack the complete sgls inside the req/reply, see obj_shard_rw(). */ static int -obj_set_reply_nrs(crt_rpc_t *rpc, daos_handle_t ioh, d_sg_list_t *echo_sgl, uint8_t *skips) +obj_set_reply_nrs(crt_rpc_t *rpc, daos_handle_t ioh, d_sg_list_t *echo_sgl, uint64_t *skips) { struct obj_rw_in *orw = crt_req_get(rpc); struct obj_rw_out *orwo = crt_reply_get(rpc); @@ -722,7 +728,7 @@ obj_set_reply_nrs(crt_rpc_t *rpc, daos_handle_t ioh, d_sg_list_t *echo_sgl, uint nrs = orwo->orw_nrs.ca_arrays; data_sizes = orwo->orw_data_sizes.ca_arrays; for (i = 0, idx = 0; i < nrs_count; i++) { - if (skips != NULL && isset(skips, i)) { + if (skips != NULL && isset64(skips, i)) { nrs[i] = 0; data_sizes[i] = 0; continue; @@ -836,7 +842,7 @@ obj_echo_rw(crt_rpc_t *rpc, daos_iod_t *iod, uint64_t *off) /* Only support 1 iod now */ bulk_bind = orw->orw_flags & ORF_BULK_BIND; rc = obj_bulk_transfer(rpc, bulk_op, bulk_bind, orw->orw_bulks.ca_arrays, off, - NULL, DAOS_HDL_INVAL, &p_sgl, 1, NULL, NULL); + NULL, DAOS_HDL_INVAL, &p_sgl, 1, 1, NULL, NULL); out: orwo->orw_ret = rc; orwo->orw_map_version = orw->orw_map_ver; @@ -886,7 +892,7 @@ get_iod_csum(struct dcs_iod_csums *iod_csums, int i) static int csum_add2iods(daos_handle_t ioh, daos_iod_t *iods, uint32_t iods_nr, - uint8_t *skips, struct daos_csummer *csummer, + uint64_t *skips, struct daos_csummer *csummer, struct dcs_iod_csums *iod_csums, daos_unit_oid_t oid, daos_key_t *dkey) { @@ -900,7 +906,7 @@ csum_add2iods(daos_handle_t ioh, daos_iod_t *iods, uint32_t iods_nr, uint32_t csum_info_nr = vos_ioh2ci_nr(ioh); for (i = 0, idx = 0; i < iods_nr; i++) { - if (skips != NULL && isset(skips, i)) + if (skips != NULL && isset64(skips, i)) continue; if (biov_csums_idx >= csum_info_nr) break; /** no more csums to add */ @@ -1056,7 +1062,7 @@ obj_log_csum_err(void) /** create maps for actually written to extents. */ static int obj_fetch_create_maps(crt_rpc_t *rpc, struct bio_desc *biod, daos_iod_t *iods, uint32_t iods_nr, - uint8_t *skips) + uint64_t *skips) { struct obj_rw_in *orw = crt_req_get(rpc); struct obj_rw_out *orwo = crt_reply_get(rpc); @@ -1094,7 +1100,7 @@ obj_fetch_create_maps(crt_rpc_t *rpc, struct bio_desc *biod, daos_iod_t *iods, u return -DER_NOMEM; } for (i = 0, idx = 0; i < total_nr; i++) { - if (!isset(skips, i)) + if (!isset64(skips, i)) result_maps[i] = maps[idx++]; } D_ASSERTF(idx == iods_nr, "idx %d, iods_nr %d\n", idx, iods_nr); @@ -1288,7 +1294,7 @@ orf_to_dtx_epoch_flags(enum obj_rpc_flags orf_flags) } static int -obj_rw_recx_list_post(struct obj_rw_in *orw, struct obj_rw_out *orwo, uint8_t *skips, int rc) +obj_rw_recx_list_post(struct obj_rw_in *orw, struct obj_rw_out *orwo, uint64_t *skips, int rc) { struct daos_recx_ep_list *lists; struct daos_recx_ep_list *old_lists; @@ -1302,7 +1308,7 @@ obj_rw_recx_list_post(struct obj_rw_in *orw, struct obj_rw_out *orwo, uint8_t *s old_lists = orwo->orw_rels.ca_arrays; for (i = 0, idx = 0; i < orw->orw_nr; i++) { - if (isset(skips, i)) { + if (isset64(skips, i)) { lists[i].re_ep_valid = 1; continue; } @@ -1320,7 +1326,7 @@ obj_rw_recx_list_post(struct obj_rw_in *orw, struct obj_rw_out *orwo, uint8_t *s static int obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *iods, - struct dcs_iod_csums *iod_csums, uint64_t *offs, uint8_t *skips, + struct dcs_iod_csums *iod_csums, uint64_t *offs, uint64_t *skips, uint32_t iods_nr, struct dtx_handle *dth) { struct obj_rw_in *orw = crt_req_get(rpc); @@ -1597,7 +1603,7 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io int i, j; for (i = 0, j = 0; i < orw->orw_iod_array.oia_iod_nr; i++) { - if (skips != NULL && isset(skips, i)) { + if (skips != NULL && isset64(skips, i)) { orw->orw_iod_array.oia_iods[i].iod_size = 0; continue; } @@ -1636,7 +1642,8 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io if (rma) { bulk_bind = orw->orw_flags & ORF_BULK_BIND; rc = obj_bulk_transfer(rpc, bulk_op, bulk_bind, orw->orw_bulks.ca_arrays, offs, - skips, ioh, NULL, iods_nr, NULL, ioc->ioc_coh); + skips, ioh, NULL, iods_nr, orw->orw_bulks.ca_count, NULL, + ioc->ioc_coh); if (rc == 0) { bio_iod_flush(biod); @@ -1781,7 +1788,7 @@ static int obj_get_iods_offs_by_oid(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, struct daos_oclass_attr *oca, uint64_t dkey_hash, uint32_t layout_ver, daos_iod_t **iods, uint64_t **offs, - uint8_t **skips, struct dcs_iod_csums **csums, uint32_t *nr) + uint64_t **skips, struct dcs_iod_csums **csums, uint32_t *nr) { struct obj_shard_iod *siod; uint32_t local_tgt; @@ -1809,7 +1816,7 @@ obj_get_iods_offs_by_oid(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, } } if (oiod_nr > LOCAL_SKIP_BITS_NUM || *skips == NULL) { - D_ALLOC(*skips, roundup(oiod_nr / NBBY, 4)); + D_ALLOC(*skips, roundup(oiod_nr, sizeof(**skips) * NBBY) / NBBY); if (*skips == NULL) D_GOTO(out, rc = -DER_NOMEM); } @@ -1838,7 +1845,7 @@ obj_get_iods_offs_by_oid(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, if (skip) { D_DEBUG(DB_IO, "akey[%d] "DF_KEY" array skipped.\n", i, DP_KEY(&iod_parent->iod_name)); - setbit(*skips, i); + setbit64(*skips, i); continue; } } @@ -1883,7 +1890,7 @@ obj_get_iods_offs_by_oid(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, "tgt_off %d, data_tgt_nr %d.\n", i, DP_KEY(&iod_parent->iod_name), (*iods)[idx].iod_size, tgt_off, obj_ec_data_tgt_nr(oca)); - setbit(*skips, i); + setbit64(*skips, i); continue; } @@ -1925,7 +1932,7 @@ static int obj_get_iods_offs(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, struct daos_oclass_attr *oca, uint64_t dkey_hash, uint32_t layout_ver, daos_iod_t **iods, - uint64_t **offs, uint8_t **skips, struct dcs_iod_csums **p_csums, + uint64_t **offs, uint64_t **skips, struct dcs_iod_csums **p_csums, struct dcs_csum_info *csum_info, uint32_t *nr) { int rc; @@ -1967,7 +1974,7 @@ obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dt uint64_t off = 0; uint64_t *offs = &off; uint64_t local_skips = 0; - uint8_t *skips = (uint8_t *)&local_skips; + uint64_t *skips = &local_skips; uint32_t nr = 0; int rc; @@ -1990,7 +1997,7 @@ obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dt D_FREE(iods); if (offs != NULL && offs != &off && offs != orw->orw_iod_array.oia_offs) D_FREE(offs); - if (skips != NULL && skips != (uint8_t *)&local_skips) + if (skips != NULL && skips != &local_skips) D_FREE(skips); return rc; @@ -2448,7 +2455,7 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc) goto end; } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &oer->er_bulk, NULL, NULL, - ioh, NULL, 1, NULL, ioc.ioc_coh); + ioh, NULL, 1, 1, NULL, ioc.ioc_coh); if (rc) D_ERROR(DF_UOID " bulk transfer failed: " DF_RC "\n", DP_UOID(oer->er_oid), DP_RC(rc)); @@ -2526,7 +2533,7 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc) goto end; } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &oea->ea_bulk, - NULL, NULL, ioh, NULL, 1, NULL, ioc.ioc_coh); + NULL, NULL, ioh, NULL, 1, 1, NULL, ioc.ioc_coh); if (rc) D_ERROR(DF_UOID " bulk transfer failed: " DF_RC "\n", DP_UOID(oea->ea_oid), DP_RC(rc)); @@ -3275,7 +3282,7 @@ obj_enum_reply_bulk(crt_rpc_t *rpc) return 0; rc = obj_bulk_transfer(rpc, CRT_BULK_PUT, false, bulks, NULL, NULL, - DAOS_HDL_INVAL, sgls, idx, NULL, NULL); + DAOS_HDL_INVAL, sgls, idx, idx, NULL, NULL); if (oei->oei_kds_bulk) { D_FREE(oeo->oeo_kds.ca_arrays); oeo->oeo_kds.ca_count = 0; @@ -4377,8 +4384,8 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp daos_iod_t *local_p_iods[LOCAL_STACK_NUM] = {0}; struct dcs_iod_csums *local_p_csums[LOCAL_STACK_NUM] = {0}; uint64_t *local_p_offs[LOCAL_STACK_NUM] = {0}; - uint8_t *local_p_skips[LOCAL_STACK_NUM] = {0}; - uint8_t **pskips = NULL; + uint64_t *local_p_skips[LOCAL_STACK_NUM] = {0}; + uint64_t **pskips = NULL; daos_iod_t **piods = NULL; uint32_t *piod_nrs = NULL; struct dcs_iod_csums **pcsums = NULL; @@ -4448,7 +4455,7 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp piods[i] = &local_iods[i]; pcsums[i] = &local_csums[i]; poffs[i] = &local_offs[i]; - pskips[i] = (uint8_t *)&local_skips[i]; + pskips[i] = &local_skips[i]; } } @@ -4560,7 +4567,7 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp rc = obj_bulk_transfer(rpc, CRT_BULK_GET, dcu->dcu_flags & ORF_BULK_BIND, dcu->dcu_bulks, poffs[i], pskips[i], iohs[i], NULL, - piod_nrs[i], &bulks[i], ioc->ioc_coh); + piod_nrs[i], dcsr->dcsr_nr, &bulks[i], ioc->ioc_coh); if (rc != 0) { D_ERROR("Bulk transfer failed for obj " DF_UOID", DTX "DF_DTI": "DF_RC"\n", @@ -4790,7 +4797,7 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp poffs[i] != dcu->dcu_iod_array.oia_offs) D_FREE(poffs[i]); - if (pskips != NULL && pskips[i] != NULL && pskips[i] != (uint8_t *)&local_skips[i]) + if (pskips != NULL && pskips[i] != NULL && pskips[i] != &local_skips[i]) D_FREE(pskips[i]); if (pcsums != NULL && pcsums[i] != NULL && pcsums[i] != &local_csums[i] && @@ -5276,7 +5283,7 @@ ds_obj_cpd_body_bulk(crt_rpc_t *rpc, struct obj_io_context *ioc, bool leader, } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, ORF_BULK_BIND, bulks, NULL, NULL, - DAOS_HDL_INVAL, sgls, count, NULL, ioc->ioc_coh); + DAOS_HDL_INVAL, sgls, count, count, NULL, ioc->ioc_coh); if (rc != 0) goto out;