diff --git a/src/bio/bio_buffer.c b/src/bio/bio_buffer.c index 551c13db40e..1f6baae521b 100644 --- a/src/bio/bio_buffer.c +++ b/src/bio/bio_buffer.c @@ -34,11 +34,6 @@ dma_alloc_chunk(unsigned int cnt) D_ASSERT(bytes > 0); - if (DAOS_FAIL_CHECK(DAOS_NVME_ALLOCBUF_ERR)) { - D_ERROR("Injected DMA buffer allocation error.\n"); - return NULL; - } - D_ALLOC_PTR(chunk); if (chunk == NULL) { return NULL; @@ -848,6 +843,7 @@ dma_map_one(struct bio_desc *biod, struct bio_iov *biov, void *arg) bio_iov_set_raw_buf(biov, NULL); return 0; } + D_ASSERT(!BIO_ADDR_IS_GANG(&biov->bi_addr)); if (direct_scm_access(biod, biov)) { struct umem_instance *umem = biod->bd_umem; diff --git a/src/bio/bio_bulk.c b/src/bio/bio_bulk.c index 05d3c5624c8..059401460ee 100644 --- a/src/bio/bio_bulk.c +++ b/src/bio/bio_bulk.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2021-2022 Intel Corporation. + * (C) Copyright 2021-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -640,6 +640,7 @@ bulk_map_one(struct bio_desc *biod, struct bio_iov *biov, void *data) goto done; } D_ASSERT(!BIO_ADDR_IS_DEDUP(&biov->bi_addr)); + D_ASSERT(!BIO_ADDR_IS_GANG(&biov->bi_addr)); hdl = bulk_get_hdl(biod, biov, roundup_pgs(pg_cnt), pg_off, arg); if (hdl == NULL) { diff --git a/src/bio/bio_xstream.c b/src/bio/bio_xstream.c index 4bba7359a7e..a5a1868e811 100644 --- a/src/bio/bio_xstream.c +++ b/src/bio/bio_xstream.c @@ -30,7 +30,6 @@ /* SPDK blob parameters */ #define DAOS_BS_CLUSTER_SZ (1ULL << 25) /* 32MB */ /* DMA buffer parameters */ -#define DAOS_DMA_CHUNK_MB 8 /* 8MB DMA chunks */ #define DAOS_DMA_CHUNK_CNT_INIT 24 /* Per-xstream init chunks, 192MB */ #define DAOS_DMA_CHUNK_CNT_MAX 128 /* Per-xstream max chunks, 1GB */ #define DAOS_DMA_CHUNK_CNT_MIN 32 /* Per-xstream min chunks, 256MB */ @@ -207,7 +206,7 @@ bio_nvme_init(const char *nvme_conf, int numa_node, unsigned int mem_size, { char *env; int rc, fd; - unsigned int size_mb = DAOS_DMA_CHUNK_MB; + unsigned int size_mb = BIO_DMA_CHUNK_MB; if (tgt_nr <= 0) { D_ERROR("tgt_nr: %u should be > 0\n", tgt_nr); diff --git a/src/include/daos/common.h b/src/include/daos/common.h index f3e7c172f6a..6bad86f91b8 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -851,7 +851,7 @@ enum { #define DAOS_NVME_FAULTY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x50) #define DAOS_NVME_WRITE_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x51) #define DAOS_NVME_READ_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x52) -#define DAOS_NVME_ALLOCBUF_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x53) +#define DAOS_NVME_ALLOCBUF_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x53) /* deprecated */ #define DAOS_NVME_WAL_TX_LOST (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x54) #define DAOS_POOL_CREATE_FAIL_CORPC (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x60) diff --git a/src/include/daos_srv/bio.h b/src/include/daos_srv/bio.h index a69f456e232..c32202a1b19 100644 --- a/src/include/daos_srv/bio.h +++ b/src/include/daos_srv/bio.h @@ -29,30 +29,47 @@ ((addr)->ba_flags &= ~(BIO_FLAG_DEDUP_BUF)) #define BIO_ADDR_IS_CORRUPTED(addr) ((addr)->ba_flags & BIO_FLAG_CORRUPTED) #define BIO_ADDR_SET_CORRUPTED(addr) ((addr)->ba_flags |= BIO_FLAG_CORRUPTED) +#define BIO_ADDR_IS_GANG(addr) ((addr)->ba_flags & BIO_FLAG_GANG) +#define BIO_ADDR_SET_GANG(addr) ((addr)->ba_flags |= BIO_FLAG_GANG) /* Can support up to 16 flags for a BIO address */ enum BIO_FLAG { /* The address is a hole */ BIO_FLAG_HOLE = (1 << 0), - /* The address is a deduped extent */ + /* The address is a deduped extent, transient only flag */ BIO_FLAG_DEDUP = (1 << 1), - /* The address is a buffer for dedup verify */ + /* The address is a buffer for dedup verify, transient only flag */ BIO_FLAG_DEDUP_BUF = (1 << 2), + /* The data located on the address is marked as corrupted */ BIO_FLAG_CORRUPTED = (1 << 3), + /* The address is a gang address */ + BIO_FLAG_GANG = (1 << 4), }; +#define BIO_DMA_CHUNK_MB 8 /* 8MB DMA chunks */ + +/** + * It's used to represent an address on SCM, or an address on NVMe, or a gang address. + * + * The gang address consists of N addresses from scattered allocations, the scattered + * allocations could have different size and media type, they are compactly stored on + * the SCM pointing by 'ba_off' as following: + * + * N 64bits offsets, N 32bits sizes, N 8bits media types + */ typedef struct { /* - * Byte offset within PMDK pmemobj pool for SCM; + * Byte offset within PMDK pmemobj pool for SCM or gang address; * Byte offset within SPDK blob for NVMe. */ uint64_t ba_off; /* DAOS_MEDIA_SCM or DAOS_MEDIA_NVME */ uint8_t ba_type; - uint8_t ba_pad1; + /* Number of addresses when BIO_FLAG_GANG is set */ + uint8_t ba_gang_nr; /* See BIO_FLAG enum */ uint16_t ba_flags; - uint32_t ba_pad2; + uint32_t ba_pad; } bio_addr_t; struct sys_db; @@ -127,8 +144,63 @@ enum bio_bs_state { BIO_BS_STATE_SETUP, }; +/* Size for storing N offset + size + metia_type */ +static inline unsigned int +bio_gaddr_size(uint8_t gang_nr) +{ + unsigned int size; + + if (gang_nr == 0) + return 0; + + size = sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint8_t); + return roundup(size * gang_nr, sizeof(uint64_t)); +} + +static inline void +bio_gaddr_set(struct umem_instance *umm, bio_addr_t *gaddr, int i, + uint8_t type, uint32_t len, uint64_t off) +{ + uint8_t *ptr; + unsigned int ptr_off; + + D_ASSERT(BIO_ADDR_IS_GANG(gaddr)); + D_ASSERT(i < gaddr->ba_gang_nr); + ptr = umem_off2ptr(umm, gaddr->ba_off); + + ptr_off = sizeof(uint64_t) * i; + *((uint64_t *)(ptr + ptr_off)) = off; + + ptr_off = sizeof(uint64_t) * gaddr->ba_gang_nr + sizeof(uint32_t) * i; + *((uint32_t *)(ptr + ptr_off)) = len; + + ptr_off = (sizeof(uint64_t) + sizeof(uint32_t)) * gaddr->ba_gang_nr + i; + *(ptr + ptr_off) = type; +} + +static inline void +bio_gaddr_get(struct umem_instance *umm, bio_addr_t *gaddr, int i, + uint8_t *type, uint32_t *len, uint64_t *off) +{ + uint8_t *ptr; + unsigned int ptr_off; + + D_ASSERT(BIO_ADDR_IS_GANG(gaddr)); + D_ASSERT(i < gaddr->ba_gang_nr); + ptr = umem_off2ptr(umm, gaddr->ba_off); + + ptr_off = sizeof(uint64_t) * i; + *off = *((uint64_t *)(ptr + ptr_off)); + + ptr_off = sizeof(uint64_t) * gaddr->ba_gang_nr + sizeof(uint32_t) * i; + *len = *((uint32_t *)(ptr + ptr_off)); + + ptr_off = (sizeof(uint64_t) + sizeof(uint32_t)) * gaddr->ba_gang_nr + i; + *type = *(ptr + ptr_off); +} + static inline void -bio_addr_set(bio_addr_t *addr, uint16_t type, uint64_t off) +bio_addr_set(bio_addr_t *addr, uint8_t type, uint64_t off) { addr->ba_type = type; addr->ba_off = umem_off2offset(off); diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 194b2434c28..b57220f9a7c 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -21,6 +21,7 @@ #define VOS_POOL_DF_2_2 24 #define VOS_POOL_DF_2_4 25 #define VOS_POOL_DF_2_6 26 +#define VOS_POOL_DF_2_8 28 struct dtx_rsrvd_uint { void *dru_scm; @@ -299,6 +300,8 @@ enum { VOS_POOL_FEAT_EMBED_FIRST = (1ULL << 3), /** Flat DKEY support enabled */ VOS_POOL_FEAT_FLAT_DKEY = (1ULL << 4), + /** Gang address for SV support */ + VOS_POOL_FEAT_GANG_SV = (1ULL << 5), }; /** Mask for any conditionals passed to to the fetch */ diff --git a/src/object/srv_enum.c b/src/object/srv_enum.c index bb9c49d0566..e1513f02f7f 100644 --- a/src/object/srv_enum.c +++ b/src/object/srv_enum.c @@ -1,5 +1,5 @@ /* - * (C) Copyright 2018-2022 Intel Corporation. + * (C) Copyright 2018-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -617,7 +617,8 @@ fill_rec(daos_handle_t ih, vos_iter_entry_t *key_ent, struct ds_obj_enum_arg *ar * enum pack implementation doesn't support yield & re-probe. */ if (arg->inline_thres > 0 && data_size <= arg->inline_thres && - data_size > 0 && bio_iov2media(&key_ent->ie_biov) != DAOS_MEDIA_NVME) { + data_size > 0 && bio_iov2media(&key_ent->ie_biov) != DAOS_MEDIA_NVME && + !BIO_ADDR_IS_GANG(&key_ent->ie_biov.bi_addr)) { inline_data = true; size += data_size; } diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index 93bb20d4906..2f084a2d99d 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -2955,78 +2955,90 @@ io_query_key_negative(void **state) assert_rc_equal(rc, -DER_INVAL); } -static inline int -dummy_bulk_create(void *ctxt, d_sg_list_t *sgl, unsigned int perm, void **bulk_hdl) +static int +gang_sv_io(struct io_test_args *arg, daos_epoch_t epoch, char *dkey_buf, char *akey_buf, + char *update_buf, char *fetch_buf, daos_size_t rsize) { - return 0; -} + daos_iod_t iod = { 0 }; + daos_key_t dkey, akey; + d_iov_t val_iov; + d_sg_list_t sgl = { 0 }; + int rc; + + set_iov(&dkey, dkey_buf, is_daos_obj_type_set(arg->otype, DAOS_OT_DKEY_UINT64)); + set_iov(&akey, akey_buf, is_daos_obj_type_set(arg->otype, DAOS_OT_AKEY_UINT64)); + + iod.iod_name = akey; + iod.iod_type = DAOS_IOD_SINGLE; + iod.iod_size = rsize; + iod.iod_nr = 1; + + dts_buf_render(update_buf, rsize); + d_iov_set(&val_iov, update_buf, rsize); + sgl.sg_nr = 1; + sgl.sg_iovs = &val_iov; + + rc = io_test_obj_update(arg, epoch, 0, &dkey, &iod, &sgl, NULL, true); + if (rc) + return rc; + + memset(fetch_buf, 0, rsize); + d_iov_set(&val_iov, fetch_buf, rsize); + iod.iod_size = DAOS_REC_ANY; + + rc = io_test_obj_fetch(arg, epoch, 0, &dkey, &iod, &sgl, true); + if (rc) + return rc; + + /* Verify */ + assert_int_equal(iod.iod_size, rsize); + assert_memory_equal(update_buf, fetch_buf, rsize); -static inline int -dummy_bulk_free(void *bulk_hdl) -{ return 0; } -/* Verify the fix of DAOS-10748 */ static void -io_allocbuf_failure(void **state) +gang_sv_test(void **state) { struct io_test_args *arg = *state; - char dkey_buf[UPDATE_DKEY_SIZE] = { 0 }; - char akey_buf[UPDATE_AKEY_SIZE] = { 0 }; - daos_iod_t iod = { 0 }; - d_sg_list_t sgl = { 0 }; - daos_key_t dkey_iov, akey_iov; - daos_epoch_t epoch = 1; - char *buf; - daos_handle_t ioh; - int fake_ctxt; - daos_size_t buf_len = (40UL << 20); /* 40MB, larger than DMA chunk size */ - int rc; + char dkey_buf[UPDATE_DKEY_SIZE], akey_buf[UPDATE_AKEY_SIZE]; + char *update_buf, *fetch_buf; + daos_size_t rsize = (27UL << 20); /* 27MB */ + daos_epoch_t epoch = 1; + int rc; + + D_ALLOC(update_buf, rsize); + assert_non_null(update_buf); - FAULT_INJECTION_REQUIRED(); + D_ALLOC(fetch_buf, rsize); + assert_non_null(fetch_buf); vts_key_gen(&dkey_buf[0], arg->dkey_size, true, arg); vts_key_gen(&akey_buf[0], arg->akey_size, false, arg); - set_iov(&dkey_iov, &dkey_buf[0], is_daos_obj_type_set(arg->otype, DAOS_OT_DKEY_UINT64)); - set_iov(&akey_iov, &akey_buf[0], is_daos_obj_type_set(arg->otype, DAOS_OT_AKEY_UINT64)); - rc = d_sgl_init(&sgl, 1); + print_message("Gang SV update/fetch.\n"); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); assert_rc_equal(rc, 0); - D_ALLOC(buf, buf_len); - assert_non_null(buf); - - sgl.sg_iovs[0].iov_buf = buf; - sgl.sg_iovs[0].iov_buf_len = buf_len; - sgl.sg_iovs[0].iov_len = buf_len; - - iod.iod_name = akey_iov; - iod.iod_nr = 1; - iod.iod_type = DAOS_IOD_SINGLE; - iod.iod_size = buf_len; - iod.iod_recxs = NULL; - + print_message("Gang SV ZC update/fetch.\n"); + epoch++; arg->ta_flags |= TF_ZERO_COPY; - - bio_register_bulk_ops(dummy_bulk_create, dummy_bulk_free); - daos_fail_loc_set(DAOS_NVME_ALLOCBUF_ERR | DAOS_FAIL_ONCE); - - rc = vos_update_begin(arg->ctx.tc_co_hdl, arg->oid, epoch, 0, &dkey_iov, - 1, &iod, NULL, 0, &ioh, NULL); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); assert_rc_equal(rc, 0); - rc = bio_iod_prep(vos_ioh2desc(ioh), BIO_CHK_TYPE_IO, (void *)&fake_ctxt, 0); - assert_rc_equal(rc, -DER_NOMEM); - daos_fail_loc_set(0); - bio_register_bulk_ops(NULL, NULL); + print_message("Gang SV update/fetch with CSUM.\n"); + epoch++; + arg->ta_flags &= ~TF_ZERO_COPY; + arg->ta_flags |= TF_USE_CSUMS; + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); + assert_rc_equal(rc, 0); - rc = vos_update_end(ioh, 0, &dkey_iov, rc, NULL, NULL); - assert_rc_equal(rc, -DER_NOMEM); + print_message("Gang SV overwrite with CSUM.\n"); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); + assert_rc_equal(rc, 0); - d_sgl_fini(&sgl, false); - D_FREE(buf); - arg->ta_flags &= ~TF_ZERO_COPY; + D_FREE(update_buf); + D_FREE(fetch_buf); } static const struct CMUnitTest iterator_tests[] = { @@ -3074,7 +3086,7 @@ static const struct CMUnitTest int_tests[] = { NULL}, {"VOS300.2: Key query test", io_query_key, NULL, NULL}, {"VOS300.3: Key query negative test", io_query_key_negative, NULL, NULL}, - {"VOS300.4: Return error on DMA buffer allocation failure", io_allocbuf_failure, NULL, NULL}, + {"VOS300.4: Gang SV update/fetch test", gang_sv_test, NULL, NULL}, }; static int diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index dbb8d28fd04..cd2f2a5a693 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -123,6 +123,7 @@ vos_bio_addr_free(struct vos_pool *pool, bio_addr_t *addr, daos_size_t nob) if (bio_addr_is_hole(addr)) return 0; + D_ASSERT(!BIO_ADDR_IS_GANG(addr)); if (addr->ba_type == DAOS_MEDIA_SCM) { rc = umem_free(&pool->vp_umm, addr->ba_off); } else { diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index f6a74fce7e6..9441ba45265 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -918,33 +918,31 @@ static inline void vos_irec_init_csum(struct vos_irec_df *irec, } } -/** Size of metadata without user payload */ -static inline uint64_t -vos_irec_msize(struct vos_rec_bundle *rbund) +#define VOS_GANG_SIZE_THRESH (BIO_DMA_CHUNK_MB << 20) /* 8MB */ + +static inline unsigned int +vos_irec_gang_nr(struct vos_pool *pool, daos_size_t rsize) { - uint64_t size = 0; + if (pool->vp_feats & VOS_POOL_FEAT_GANG_SV) { + if (rsize > VOS_GANG_SIZE_THRESH) + return (rsize + VOS_GANG_SIZE_THRESH - 1) / VOS_GANG_SIZE_THRESH; + } - if (rbund->rb_csum != NULL) - size = vos_size_round(rbund->rb_csum->cs_len); - return size + sizeof(struct vos_irec_df); + return 0; } +/** Size of metadata without user payload */ static inline uint64_t -vos_irec_size(struct vos_rec_bundle *rbund) +vos_irec_msize(struct vos_pool *pool, struct vos_rec_bundle *rbund) { - return vos_irec_msize(rbund) + rbund->rb_rsize; -} + uint64_t size = sizeof(struct vos_irec_df); -static inline bool -vos_irec_size_equal(struct vos_irec_df *irec, struct vos_rec_bundle *rbund) -{ - if (irec->ir_size != rbund->rb_rsize) - return false; + if (rbund->rb_csum != NULL) + size += vos_size_round(rbund->rb_csum->cs_len); - if (vos_irec2csum_size(irec) != rbund->rb_csum->cs_len) - return false; + size += bio_gaddr_size(vos_irec_gang_nr(pool, rbund->rb_rsize)); - return true; + return size; } static inline char * @@ -1300,9 +1298,6 @@ int key_tree_delete(struct vos_object *obj, daos_handle_t toh, d_iov_t *key_iov); /* vos_io.c */ -daos_size_t -vos_recx2irec_size(daos_size_t rsize, struct dcs_csum_info *csum); - int vos_dedup_init(struct vos_pool *pool); void diff --git a/src/vos/vos_io.c b/src/vos/vos_io.c index 4d452f50d6a..7aa3c897755 100644 --- a/src/vos/vos_io.c +++ b/src/vos/vos_io.c @@ -20,6 +20,11 @@ #include "evt_priv.h" #include +struct vos_sv_addr { + umem_off_t sa_umoff; /* SV record address */ + bio_addr_t sa_addr; /* SV payload address */ +}; + /** I/O context */ struct vos_io_context { EVT_ENT_ARRAY_LG_PTR(ic_ent_array); @@ -49,7 +54,10 @@ struct vos_io_context { /** reserved offsets for SCM update */ umem_off_t *ic_umoffs; unsigned int ic_umoffs_cnt; - unsigned int ic_umoffs_at; + /** reserved SV addresses */ + struct vos_sv_addr *ic_sv_addrs; + unsigned int ic_sv_addr_cnt; + unsigned int ic_sv_addr_at; /** reserved NVMe extents */ d_list_t ic_blk_exts; daos_size_t ic_space_held[DAOS_MEDIA_MAX]; @@ -518,6 +526,7 @@ vos_ioc_reserve_fini(struct vos_io_context *ioc) D_ASSERT(d_list_empty(&ioc->ic_blk_exts)); D_ASSERT(d_list_empty(&ioc->ic_dedup_entries)); D_FREE(ioc->ic_umoffs); + D_FREE(ioc->ic_sv_addrs); } static int @@ -525,6 +534,7 @@ vos_ioc_reserve_init(struct vos_io_context *ioc, struct dtx_handle *dth) { struct umem_rsrvd_act *scm; int total_acts = 0; + unsigned int gang_nr, sv_nr = 0; int i; if (!ioc->ic_update) @@ -533,9 +543,26 @@ vos_ioc_reserve_init(struct vos_io_context *ioc, struct dtx_handle *dth) for (i = 0; i < ioc->ic_iod_nr; i++) { daos_iod_t *iod = &ioc->ic_iods[i]; + if (iod->iod_type == DAOS_IOD_SINGLE) { + gang_nr = vos_irec_gang_nr(ioc->ic_cont->vc_pool, iod->iod_size); + if (gang_nr > UINT8_MAX) { + D_ERROR("Too large SV:"DF_U64", gang_nr:%u\n", + iod->iod_size, gang_nr); + return -DER_REC2BIG; + } + total_acts += gang_nr; + sv_nr++; + } total_acts += iod->iod_nr; } + if (sv_nr > 0) { + D_ALLOC_ARRAY(ioc->ic_sv_addrs, sv_nr); + if (ioc->ic_sv_addrs == NULL) + return -DER_NOMEM; + ioc->ic_sv_addr_cnt = sv_nr; + } + D_ALLOC_ARRAY(ioc->ic_umoffs, total_acts); if (ioc->ic_umoffs == NULL) return -DER_NOMEM; @@ -684,7 +711,7 @@ vos_ioc_create(daos_handle_t coh, daos_unit_oid_t oid, bool read_only, ioc->ic_remove = ((vos_flags & VOS_OF_REMOVE) != 0); ioc->ic_ec = ((vos_flags & VOS_OF_EC) != 0); ioc->ic_rebuild = ((vos_flags & VOS_OF_REBUILD) != 0); - ioc->ic_umoffs_cnt = ioc->ic_umoffs_at = 0; + ioc->ic_umoffs_cnt = 0; ioc->ic_iod_csums = iod_csums; vos_ilog_fetch_init(&ioc->ic_dkey_info); vos_ilog_fetch_init(&ioc->ic_akey_info); @@ -740,13 +767,26 @@ vos_ioc_create(daos_handle_t coh, daos_unit_oid_t oid, bool read_only, for (i = 0; i < iod_nr; i++) { int iov_nr = iods[i].iod_nr; + unsigned int gang_nr; struct bio_sglist *bsgl; - if ((iods[i].iod_type == DAOS_IOD_SINGLE && iov_nr != 1)) { - D_ERROR("Invalid iod_nr=%d, iod_type %d.\n", - iov_nr, iods[i].iod_type); - rc = -DER_IO_INVAL; - goto error; + if (iods[i].iod_type == DAOS_IOD_SINGLE) { + if (iov_nr != 1) { + D_ERROR("Invalid iod_nr=%d, iod_type %d.\n", + iov_nr, iods[i].iod_type); + rc = -DER_IO_INVAL; + goto error; + } + + gang_nr = vos_irec_gang_nr(cont->vc_pool, iods[i].iod_size); + if (gang_nr > UINT8_MAX) { + D_ERROR("Too large SV:"DF_U64", gang_nr:%u\n", + iods[i].iod_size, gang_nr); + rc = -DER_REC2BIG; + goto error; + } + if (gang_nr > 1) + iov_nr = gang_nr; } /* Don't bother to initialize SGLs for size fetch */ @@ -819,6 +859,55 @@ save_csum(struct vos_io_context *ioc, struct dcs_csum_info *csum_info, return dcs_csum_info_save(&ioc->ic_csum_list, &ci_duplicate); } +static int +iod_gang_fetch(struct vos_io_context *ioc, struct bio_iov *biov) +{ + struct bio_iov sub_iov = { 0 }; + uint64_t tot_len; + uint32_t data_len; + int i, rc = 0; + + if (ioc->ic_size_fetch) + return 0; + + if (biov->bi_addr.ba_gang_nr < 2) { + D_ERROR("Invalid gang address nr:%u\n", biov->bi_addr.ba_gang_nr); + return -DER_INVAL; + } + + tot_len = bio_iov2len(biov); + if (tot_len == 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64"\n", + biov->bi_addr.ba_gang_nr, bio_iov2len(biov)); + return -DER_INVAL; + } + + for (i = 0; i < biov->bi_addr.ba_gang_nr; i++) { + bio_gaddr_get(vos_ioc2umm(ioc), &biov->bi_addr, i, &sub_iov.bi_addr.ba_type, + &data_len, &sub_iov.bi_addr.ba_off); + + bio_iov_set_len(&sub_iov, data_len); + if (tot_len < data_len) { + D_ERROR("Invalid gang addr[%d], nr:%u, rsize:"DF_U64", len:"DF_U64"/%u\n", + i, biov->bi_addr.ba_gang_nr, bio_iov2len(biov), tot_len, data_len); + return -DER_INVAL; + } + tot_len -= data_len; + + rc = iod_fetch(ioc, &sub_iov); + if (rc) + return rc; + } + + if (tot_len != 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64", left:"DF_U64"\n", + biov->bi_addr.ba_gang_nr, bio_iov2len(biov), tot_len); + return -DER_INVAL; + } + + return 0; +} + /** Fetch the single value within the specified epoch range of an key */ static int akey_fetch_single(daos_handle_t toh, const daos_epoch_range_t *epr, @@ -873,7 +962,11 @@ akey_fetch_single(daos_handle_t toh, const daos_epoch_range_t *epr, return -DER_CSUM; } - rc = iod_fetch(ioc, &biov); + if (BIO_ADDR_IS_HOLE(&biov.bi_addr) || !BIO_ADDR_IS_GANG(&biov.bi_addr)) + rc = iod_fetch(ioc, &biov); + else + rc = iod_gang_fetch(ioc, &biov); + if (rc != 0) goto out; @@ -1612,21 +1705,6 @@ vos_fetch_begin(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, return rc; } -static umem_off_t -iod_update_umoff(struct vos_io_context *ioc) -{ - umem_off_t umoff; - - D_ASSERTF(ioc->ic_umoffs_at < ioc->ic_umoffs_cnt, - "Invalid ioc_reserve at/cnt: %u/%u\n", - ioc->ic_umoffs_at, ioc->ic_umoffs_cnt); - - umoff = ioc->ic_umoffs[ioc->ic_umoffs_at]; - ioc->ic_umoffs_at++; - - return umoff; -} - static struct bio_iov * iod_update_biov(struct vos_io_context *ioc) { @@ -1643,6 +1721,20 @@ iod_update_biov(struct vos_io_context *ioc) return biov; } +static inline struct vos_sv_addr * +iod_get_sv_addr(struct vos_io_context *ioc) +{ + struct vos_sv_addr *sv_addr; + + D_ASSERTF(ioc->ic_sv_addr_at < ioc->ic_sv_addr_cnt, "sv_at:%u >= sv_cnt:%u\n", + ioc->ic_sv_addr_at, ioc->ic_sv_addr_cnt); + + sv_addr = &ioc->ic_sv_addrs[ioc->ic_sv_addr_at]; + ioc->ic_sv_addr_at++; + + return sv_addr; +} + static int akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, daos_size_t gsize, struct vos_io_context *ioc, @@ -1652,22 +1744,22 @@ akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, struct vos_rec_bundle rbund; struct dcs_csum_info csum; d_iov_t kiov, riov; - struct bio_iov *biov; + struct bio_iov biov; struct dcs_csum_info *value_csum; - umem_off_t umoff; + struct vos_sv_addr *sv_addr; daos_epoch_t epoch = ioc->ic_epr.epr_hi; int rc; + D_ASSERT(ioc->ic_iov_at == 0); + ci_set_null(&csum); d_iov_set(&kiov, &key, sizeof(key)); key.sk_epoch = epoch; key.sk_minor_epc = minor_epc; - umoff = iod_update_umoff(ioc); - D_ASSERT(!UMOFF_IS_NULL(umoff)); - - D_ASSERT(ioc->ic_iov_at == 0); - biov = iod_update_biov(ioc); + sv_addr = iod_get_sv_addr(ioc); + D_ASSERT(!UMOFF_IS_NULL(sv_addr->sa_umoff)); + bio_iov_set(&biov, sv_addr->sa_addr, rsize); tree_rec_bundle2iov(&rbund, &riov); @@ -1678,10 +1770,10 @@ akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, else rbund.rb_csum = &csum; - rbund.rb_biov = biov; + rbund.rb_biov = &biov; rbund.rb_rsize = rsize; rbund.rb_gsize = gsize; - rbund.rb_off = umoff; + rbund.rb_off = sv_addr->sa_umoff; rbund.rb_ver = pm_ver; rc = dbtree_update(toh, &kiov, &riov); @@ -1830,10 +1922,7 @@ update_value(struct vos_io_context *ioc, daos_iod_t *iod, struct dcs_csum_info * } for (i = 0; i < iod->iod_nr; i++) { - umem_off_t umoff = iod_update_umoff(ioc); - if (iod->iod_recxs[i].rx_nr == 0) { - D_ASSERT(UMOFF_IS_NULL(umoff)); D_DEBUG(DB_IO, "Skip empty write IOD at %d: idx %lu, nr %lu\n", i, (unsigned long)iod->iod_recxs[i].rx_idx, (unsigned long)iod->iod_recxs[i].rx_nr); @@ -1997,6 +2086,7 @@ dkey_update(struct vos_io_context *ioc, uint32_t pm_ver, daos_key_t *dkey, goto out; } + ioc->ic_sv_addr_at = 0; if (krec->kr_bmap & KREC_BF_NO_AKEY) { struct dcs_csum_info *iod_csums = vos_csum_at(ioc->ic_iod_csums, 0); iod_set_cursor(ioc, 0); @@ -2027,17 +2117,6 @@ dkey_update(struct vos_io_context *ioc, uint32_t pm_ver, daos_key_t *dkey, return rc; } -daos_size_t -vos_recx2irec_size(daos_size_t rsize, struct dcs_csum_info *csum) -{ - struct vos_rec_bundle rbund; - - rbund.rb_csum = csum; - rbund.rb_rsize = rsize; - - return vos_irec_size(&rbund); -} - umem_off_t vos_reserve_scm(struct vos_container *cont, struct umem_rsrvd_act *rsrvd_scm, daos_size_t size) @@ -2127,7 +2206,7 @@ reserve_space(struct vos_io_context *ioc, uint16_t media, daos_size_t size, return rc; } -static int +static void iod_reserve(struct vos_io_context *ioc, struct bio_iov *biov) { struct bio_sglist *bsgl; @@ -2144,37 +2223,102 @@ iod_reserve(struct vos_io_context *ioc, struct bio_iov *biov) D_DEBUG(DB_TRACE, "media %d offset "DF_X64" size %zd\n", biov->bi_addr.ba_type, biov->bi_addr.ba_off, bio_iov2len(biov)); +} + +static inline void +iod_set_sv_addr(struct vos_io_context *ioc, umem_off_t umoff, bio_addr_t *addr) +{ + struct vos_sv_addr *sv_addr; + + D_ASSERTF(ioc->ic_sv_addr_at < ioc->ic_sv_addr_cnt, "sv_at:%u >= sv_cnt:%u\n", + ioc->ic_sv_addr_at, ioc->ic_sv_addr_cnt); + + sv_addr = &ioc->ic_sv_addrs[ioc->ic_sv_addr_at]; + sv_addr->sa_umoff = umoff; + sv_addr->sa_addr = *addr; + ioc->ic_sv_addr_at++; +} + +static int +gang_reserve_sv(struct vos_io_context *ioc, uint16_t media, daos_size_t size, + umem_off_t umoff, unsigned int gang_nr) +{ + struct vos_irec_df *irec; + struct bio_iov biov = { 0 }; + bio_addr_t gaddr = { 0 }; + daos_size_t alloc_sz; + uint64_t off; + char *gaddr_ptr; + int i, rc; + + D_ASSERT(gang_nr > 1); + D_ASSERT(size > VOS_GANG_SIZE_THRESH); + + irec = (struct vos_irec_df *)umem_off2ptr(vos_ioc2umm(ioc), umoff); + gaddr_ptr = vos_irec2data(irec); + + bio_addr_set(&gaddr, DAOS_MEDIA_SCM, umem_ptr2off(vos_ioc2umm(ioc), gaddr_ptr)); + gaddr.ba_gang_nr = gang_nr; + BIO_ADDR_SET_GANG(&gaddr); + + iod_set_sv_addr(ioc, umoff, &gaddr); + + for (i = 0; i < gang_nr; i++) { + D_ASSERT(size > 0); + alloc_sz = min(size, VOS_GANG_SIZE_THRESH); + + rc = reserve_space(ioc, media, alloc_sz, &off); + if (rc) { + DL_ERROR(rc, "Reserve SV on %s failed.", + media == DAOS_MEDIA_SCM ? "SCM" : "NVMe"); + return rc; + } + + bio_addr_set(&biov.bi_addr, media, off); + bio_iov_set_len(&biov, alloc_sz); + iod_reserve(ioc, &biov); + + /* + * Update the SV record metadata on SCM, tx_add_range() will be called by + * svt_rec_alloc_common() later. + */ + bio_gaddr_set(vos_ioc2umm(ioc), &gaddr, i, media, alloc_sz, off); + + size -= alloc_sz; + } + D_ASSERT(size == 0); + return 0; } /* Reserve single value record on specified media */ static int -vos_reserve_single(struct vos_io_context *ioc, uint16_t media, - daos_size_t size) +vos_reserve_single(struct vos_io_context *ioc, uint16_t media, daos_size_t size) { struct vos_irec_df *irec; daos_size_t scm_size; umem_off_t umoff; struct bio_iov biov; uint64_t off = 0; - int rc; + struct vos_rec_bundle rbund = { 0 }; + int rc, gang_nr; struct dcs_csum_info *value_csum = vos_csum_at(ioc->ic_iod_csums, ioc->ic_sgl_at); - /* - * TODO: - * To eliminate internal fragmentaion, misaligned record (record size - * isn't aligned with 4K) on NVMe could be split into two parts, large - * aligned part will be stored on NVMe and being referenced by - * vos_irec_df->ir_ex_addr, small unaligned part will be stored on SCM - * along with vos_irec_df, being referenced by vos_irec_df->ir_body. - */ - scm_size = (media == DAOS_MEDIA_SCM) ? - vos_recx2irec_size(size, value_csum) : - vos_recx2irec_size(0, value_csum); + gang_nr = vos_irec_gang_nr(ioc->ic_cont->vc_pool, size); + D_ASSERT(gang_nr <= UINT8_MAX); + + rbund.rb_csum = value_csum; + rbund.rb_rsize = size; + scm_size = vos_irec_msize(ioc->ic_cont->vc_pool, &rbund); + /* Payload is allocated along with the SV meta record */ + if (media == DAOS_MEDIA_SCM && gang_nr == 0) + scm_size += size; + + /* Reserve SCM for SV meta record */ rc = reserve_space(ioc, DAOS_MEDIA_SCM, scm_size, &off); if (rc) { - D_ERROR("Reserve SCM for SV failed. "DF_RC"\n", DP_RC(rc)); + DL_ERROR(rc, "Reserve SCM for SV meta failed."); return rc; } @@ -2183,13 +2327,14 @@ vos_reserve_single(struct vos_io_context *ioc, uint16_t media, irec = (struct vos_irec_df *)umem_off2ptr(vos_ioc2umm(ioc), umoff); vos_irec_init_csum(irec, value_csum); + /* The SV is huge, turn to gang allocation */ + if (gang_nr > 0) + return gang_reserve_sv(ioc, media, size, umoff, gang_nr); + memset(&biov, 0, sizeof(biov)); if (size == 0) { /* punch */ bio_addr_set_hole(&biov.bi_addr, 1); - goto done; - } - - if (media == DAOS_MEDIA_SCM) { + } else if (media == DAOS_MEDIA_SCM) { char *payload_addr; /* Get the record payload offset */ @@ -2199,15 +2344,16 @@ vos_reserve_single(struct vos_io_context *ioc, uint16_t media, } else { rc = reserve_space(ioc, DAOS_MEDIA_NVME, size, &off); if (rc) { - D_ERROR("Reserve NVMe for SV failed. "DF_RC"\n", - DP_RC(rc)); + DL_ERROR(rc, "Reserve SV on NVMe failed."); return rc; } } -done: + bio_addr_set(&biov.bi_addr, media, off); bio_iov_set_len(&biov, size); - rc = iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); + + iod_set_sv_addr(ioc, umoff, &biov.bi_addr); return rc; } @@ -2218,38 +2364,25 @@ vos_reserve_recx(struct vos_io_context *ioc, uint16_t media, daos_size_t size, { struct bio_iov biov; uint64_t off = 0; - int rc; + int rc = 0; memset(&biov, 0, sizeof(biov)); /* recx punch */ - if (size == 0 || media != DAOS_MEDIA_SCM) { - ioc->ic_umoffs[ioc->ic_umoffs_cnt] = UMOFF_NULL; - ioc->ic_umoffs_cnt++; - if (size == 0) { - bio_addr_set_hole(&biov.bi_addr, 1); - goto done; - } + if (size == 0) { + bio_addr_set_hole(&biov.bi_addr, 1); + goto done; } if (ioc->ic_dedup && size >= ioc->ic_dedup_th && - vos_dedup_lookup(vos_cont2pool(ioc->ic_cont), csum, csum_len, - &biov)) { + vos_dedup_lookup(vos_cont2pool(ioc->ic_cont), csum, csum_len, &biov)) { if (biov.bi_data_len == size) { D_ASSERT(biov.bi_addr.ba_off != 0); - ioc->ic_umoffs[ioc->ic_umoffs_cnt] = - biov.bi_addr.ba_off; - ioc->ic_umoffs_cnt++; - return iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); + return 0; } memset(&biov, 0, sizeof(biov)); } - /* - * TODO: - * To eliminate internal fragmentaion, misaligned recx (total recx size - * isn't aligned with 4K) on NVMe could be split into two evtree rects, - * larger rect will be stored on NVMe and small reminder on SCM. - */ rc = reserve_space(ioc, media, size, &off); if (rc) { D_ERROR("Reserve recx failed. "DF_RC"\n", DP_RC(rc)); @@ -2258,7 +2391,7 @@ vos_reserve_recx(struct vos_io_context *ioc, uint16_t media, daos_size_t size, done: bio_addr_set(&biov.bi_addr, media, off); bio_iov_set_len(&biov, size); - rc = iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); return rc; } diff --git a/src/vos/vos_layout.h b/src/vos/vos_layout.h index 72459544c27..902cb064e26 100644 --- a/src/vos/vos_layout.h +++ b/src/vos/vos_layout.h @@ -91,7 +91,7 @@ enum vos_gc_type { */ /** Current durable format version */ -#define POOL_DF_VERSION VOS_POOL_DF_2_6 +#define POOL_DF_VERSION VOS_POOL_DF_2_8 /** 2.2 features. Until we have an upgrade path for RDB, we need to support more than one old * version. @@ -104,6 +104,9 @@ enum vos_gc_type { /** 2.6 features */ #define VOS_POOL_FEAT_2_6 (VOS_POOL_FEAT_FLAT_DKEY | VOS_POOL_FEAT_EMBED_FIRST) +/** 2.8 features */ +#define VOS_POOL_FEAT_2_8 (VOS_POOL_FEAT_GANG_SV) + /** * Durable format for VOS pool */ diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index 77cb041711f..cc72575f608 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -1652,6 +1652,8 @@ recx_iter_copy(struct vos_obj_iter *oiter, vos_iter_entry_t *it_entry, /* Skip copy and return success for a punched record */ if (bio_addr_is_hole(&biov->bi_addr)) return 0; + else if (BIO_ADDR_IS_GANG(&biov->bi_addr)) + return -DER_NOTSUPPORTED; else if (iov_out->iov_buf_len < bio_iov2len(biov)) return -DER_OVERFLOW; diff --git a/src/vos/vos_pool.c b/src/vos/vos_pool.c index af958eafd5d..6c2e0120842 100644 --- a/src/vos/vos_pool.c +++ b/src/vos/vos_pool.c @@ -1416,6 +1416,8 @@ pool_open(void *ph, struct vos_pool_df *pool_df, unsigned int flags, void *metri pool->vp_feats |= VOS_POOL_FEAT_2_4; if (pool_df->pd_version >= VOS_POOL_DF_2_6) pool->vp_feats |= VOS_POOL_FEAT_2_6; + if (pool_df->pd_version >= VOS_POOL_DF_2_8) + pool->vp_feats |= VOS_POOL_FEAT_2_8; if (pool->vp_vea_info == NULL) /** always store on SCM if no bdev */ @@ -1587,6 +1589,8 @@ vos_pool_upgrade(daos_handle_t poh, uint32_t version) pool->vp_feats |= VOS_POOL_FEAT_2_4; if (version >= VOS_POOL_DF_2_6) pool->vp_feats |= VOS_POOL_FEAT_2_6; + if (version >= VOS_POOL_DF_2_8) + pool->vp_feats |= VOS_POOL_FEAT_2_8; return 0; } diff --git a/src/vos/vos_space.c b/src/vos/vos_space.c index a677d061cb6..5763e3f8bac 100644 --- a/src/vos/vos_space.c +++ b/src/vos/vos_space.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -212,6 +212,7 @@ estimate_space(struct vos_pool *pool, daos_key_t *dkey, unsigned int iod_nr, struct dcs_csum_info *csums, *recx_csum; daos_iod_t *iod; daos_recx_t *recx; + struct vos_rec_bundle rbund = { 0 }; daos_size_t size, scm, nvme = 0 /* in blk */; int i, j; @@ -233,16 +234,16 @@ estimate_space(struct vos_pool *pool, daos_key_t *dkey, unsigned int iod_nr, /* Single value */ if (iod->iod_type == DAOS_IOD_SINGLE) { size = iod->iod_size; + rbund.rb_csum = csums; + rbund.rb_rsize = size; /* Single value record */ - if (vos_io_scm(pool, iod->iod_type, size, VOS_IOS_GENERIC)) { - /** store data on DAOS_MEDIA_SCM */ - scm += vos_recx2irec_size(size, csums); - } else { - scm += vos_recx2irec_size(0, csums); - if (iod->iod_size != 0) - nvme += vos_byte2blkcnt(iod->iod_size); - } + scm += vos_irec_msize(pool, &rbund); + if (vos_io_scm(pool, iod->iod_type, size, VOS_IOS_GENERIC)) + scm += size; + else + nvme += vos_byte2blkcnt(size); + /* Assume one more SV tree node created */ scm += 256; continue; diff --git a/src/vos/vos_tree.c b/src/vos/vos_tree.c index c36fcaa88c5..e9dd4e94436 100644 --- a/src/vos/vos_tree.c +++ b/src/vos/vos_tree.c @@ -523,10 +523,11 @@ svt_rec_alloc_common(struct btr_instance *tins, struct btr_record *rec, struct vos_svt_key *skey, struct vos_rec_bundle *rbund) { struct vos_irec_df *irec; + struct vos_pool *pool = (struct vos_pool *)tins->ti_priv; int rc; D_ASSERT(!UMOFF_IS_NULL(rbund->rb_off)); - rc = umem_tx_xadd(&tins->ti_umm, rbund->rb_off, vos_irec_msize(rbund), + rc = umem_tx_xadd(&tins->ti_umm, rbund->rb_off, vos_irec_msize(pool, rbund), UMEM_XADD_NO_SNAPSHOT); if (rc != 0) return rc; @@ -591,6 +592,57 @@ cancel_nvme_exts(bio_addr_t *addr, struct dtx_handle *dth) D_ASSERT(0); } +static int +svt_free_payload(struct vos_pool *pool, bio_addr_t *addr, uint64_t rsize) +{ + uint64_t tot_len = rsize; + uint32_t data_len; + bio_addr_t sub_addr = { 0 }; + int i, rc = 0; + + if (bio_addr_is_hole(addr)) + return 0; + + if (tot_len == 0) { + D_ERROR("Invalid 0 SV record size\n"); + return -DER_INVAL; + } + + if (BIO_ADDR_IS_GANG(addr)) { + for (i = 0; i < addr->ba_gang_nr; i++) { + bio_gaddr_get(vos_pool2umm(pool), addr, i, &sub_addr.ba_type, &data_len, + &sub_addr.ba_off); + if (tot_len < data_len) { + D_ERROR("Invalid gang addr[%d], nr:%u, rsize:"DF_U64", " + "len:"DF_U64"/%u\n", i, addr->ba_gang_nr, rsize, + tot_len, data_len); + return -DER_INVAL; + } + tot_len -= data_len; + + rc = vos_bio_addr_free(pool, &sub_addr, data_len); + if (rc) { + DL_ERROR(rc, "SV gang free %d on %s failed.", + i, addr->ba_type == DAOS_MEDIA_SCM ? "SCM" : "NVMe"); + return rc; + } + } + + if (tot_len != 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64", left"DF_U64"\n", + addr->ba_gang_nr, rsize, tot_len); + return -DER_INVAL; + } + } else if (addr->ba_type == DAOS_MEDIA_NVME) { + rc = vos_bio_addr_free(pool, addr, rsize); + if (rc) + DL_ERROR(rc, "Free SV payload on NVMe failed."); + } + /* Payload is allocated along with vos_iref_df when SV is stored on SCM */ + + return rc; +} + static int svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, bool overwrite) @@ -608,7 +660,7 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, if (overwrite) { dth = vos_dth_get(cont->vc_pool->vp_sysdb); - if (dth == NULL) + if (dth == NULL || BIO_ADDR_IS_GANG(addr)) return -DER_NO_PERM; /* Not allowed */ } @@ -618,15 +670,11 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, return rc; if (!overwrite) { - /* SCM value is stored together with vos_irec_df */ - if (addr->ba_type == DAOS_MEDIA_NVME) { - struct vos_pool *pool = tins->ti_priv; + struct vos_pool *pool = tins->ti_priv; - D_ASSERT(pool != NULL); - rc = vos_bio_addr_free(pool, addr, irec->ir_size); - if (rc) - return rc; - } + rc = svt_free_payload(pool, addr, irec->ir_size); + if (rc) + return rc; return umem_free(&tins->ti_umm, rec->rec_off); }