From 65015fbc8dc0cae18ea14f7a9bcf9f312cea1c66 Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Mon, 25 Nov 2019 17:01:40 +0200 Subject: [PATCH 1/2] UCT/TAG: Do not post the same buffer more than once --- src/ucp/tag/offload.c | 5 +- src/uct/ib/rc/accel/rc_mlx5.inl | 23 +++++++ src/uct/ib/rc/accel/rc_mlx5_common.c | 12 ++++ src/uct/ib/rc/accel/rc_mlx5_common.h | 85 ++++++++++++++------------ test/gtest/ucp/test_ucp_tag_offload.cc | 4 +- test/gtest/uct/test_tag.cc | 56 +++++++++++++++-- 6 files changed, 137 insertions(+), 48 deletions(-) diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 72f33fa9d63..23afde931e1 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -302,8 +302,9 @@ ucp_tag_offload_do_post(ucp_request_t *req) req->recv.tag.tag_mask, &iov, 1, &req->recv.uct_ctx); if (status != UCS_OK) { - ucs_assert((status == UCS_ERR_NO_RESOURCE) || - (status == UCS_ERR_EXCEEDS_LIMIT)); + ucs_assert((status == UCS_ERR_NO_RESOURCE) || + (status == UCS_ERR_EXCEEDS_LIMIT) || + (status == UCS_ERR_ALREADY_EXISTS)); /* No more matching entries in the transport. * TODO keep registration in case SW RNDV protocol will be used */ ucp_tag_offload_release_buf(req, 1); diff --git a/src/uct/ib/rc/accel/rc_mlx5.inl b/src/uct/ib/rc/accel/rc_mlx5.inl index e8eb9a160ee..2808a136276 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.inl +++ b/src/uct/ib/rc/accel/rc_mlx5.inl @@ -1016,10 +1016,19 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface, uct_rc_mlx5_tag_entry_t *tag_entry; uint16_t next_idx; unsigned ctrl_size; + int ret; UCT_CHECK_IOV_SIZE(iovcnt, 1ul, "uct_rc_mlx5_iface_common_tag_recv"); UCT_RC_MLX5_CHECK_TAG(iface); + kh_put(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iov->buffer, &ret); + if (ucs_unlikely(!ret)) { + /* Do not post the same buffer more than once (even with different tags) + * to avoid memory corruption. */ + return UCS_ERR_ALREADY_EXISTS; + } + ucs_assert(ret > 0); + ctrl_size = sizeof(struct mlx5_wqe_ctrl_seg) + sizeof(uct_rc_mlx5_wqe_tm_seg_t); tag_entry = iface->tm.head; @@ -1053,6 +1062,17 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface, return UCS_OK; } +static UCS_F_ALWAYS_INLINE void +uct_rc_mlx5_iface_tag_del_from_hash(uct_rc_mlx5_iface_common_t *iface, + void *buffer) +{ + khiter_t iter; + + iter = kh_get(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, buffer); + ucs_assert(iter != kh_end(&iface->tm.tag_addrs)); + kh_del(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iter); +} + static UCS_F_ALWAYS_INLINE ucs_status_t uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface, uct_tag_context_t *ctx, int force) @@ -1067,6 +1087,7 @@ uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface, if (ucs_likely(force)) { flags = UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT; uct_rc_mlx5_release_tag_entry(iface, tag_entry); + uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer); } else { flags = UCT_RC_MLX5_SRQ_FLAG_TM_CQE_REQ | UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT; uct_rc_mlx5_add_cmd_wq_op(iface, tag_entry); @@ -1097,6 +1118,7 @@ uct_rc_mlx5_iface_handle_tm_list_op(uct_rc_mlx5_iface_common_t *iface, int opcod if (opcode == UCT_RC_MLX5_CQE_APP_OP_TM_REMOVE) { ctx = op->tag->ctx; priv = uct_rc_mlx5_ctx_priv(ctx); + uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer); ctx->completed_cb(ctx, priv->tag, 0, priv->length, UCS_ERR_CANCELED); } } @@ -1142,6 +1164,7 @@ uct_rc_mlx5_iface_handle_expected(uct_rc_mlx5_iface_common_t *iface, struct mlx5 byte_len = ntohl(cqe->byte_cnt); uct_rc_mlx5_release_tag_entry(iface, tag_entry); + uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer); if (cqe->op_own & MLX5_INLINE_SCATTER_64) { ucs_assert(byte_len <= priv->length); diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index a5ffb3f9364..a396328e12f 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -414,6 +414,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface) { uct_rc_mlx5_mp_hash_key_t key_gid; uint64_t key_lid; + void *tag_addr; if (!UCT_RC_MLX5_TM_ENABLED(iface)) { return; @@ -425,6 +426,12 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface) ucs_free(iface->tm.cmd_wq.ops); uct_rc_mlx5_tag_cleanup(iface); + kh_foreach_key(&iface->tm.tag_addrs, tag_addr, { + ucs_debug("destroying iface %p, with %p offloaded to the HW", + iface, tag_addr); + }); + kh_destroy_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs); + if (!UCT_RC_MLX5_MP_ENABLED(iface)) { return; } @@ -440,6 +447,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface) iface, key_gid.guid, key_gid.qp_num); }); kh_destroy_inplace(uct_rc_mlx5_mp_hash_gid, &iface->tm.mp.hash_gid); + ucs_mpool_cleanup(&iface->tm.mp.tx_mp, 1); } @@ -734,6 +742,10 @@ void uct_rc_mlx5_init_rx_tm_common(uct_rc_mlx5_iface_common_t *iface, * ptr_array is used as operation ID and is passed in "app_context" * of TM header. */ ucs_ptr_array_init(&iface->tm.rndv_comps, 0, "rm_rndv_completions"); + + /* Set of addresses posted to the HW. Used to avoid posting of the same + * address more than once. */ + kh_init_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs); } ucs_status_t uct_rc_mlx5_init_rx_tm(uct_rc_mlx5_iface_common_t *iface, diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.h b/src/uct/ib/rc/accel/rc_mlx5_common.h index e7bbe4d9f6d..7bcf27654ef 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.h +++ b/src/uct/ib/rc/accel/rc_mlx5_common.h @@ -354,68 +354,73 @@ typedef union uct_rc_mlx5_dm_copy_data { } UCS_S_PACKED uct_rc_mlx5_dm_copy_data_t; #endif +#define uct_rc_mlx5_tag_addr_hash(_ptr) kh_int64_hash_func((uintptr_t)(_ptr)) +KHASH_INIT(uct_rc_mlx5_tag_addrs, void*, char, 0, uct_rc_mlx5_tag_addr_hash, + kh_int64_hash_equal) + typedef struct uct_rc_mlx5_iface_common { - uct_rc_iface_t super; + uct_rc_iface_t super; struct { - ucs_mpool_t atomic_desc_mp; - uct_ib_mlx5_mmio_mode_t mmio_mode; - uint16_t bb_max; /* limit number of outstanding WQE BBs */ + ucs_mpool_t atomic_desc_mp; + uct_ib_mlx5_mmio_mode_t mmio_mode; + uint16_t bb_max; /* limit number of outstanding WQE BBs */ } tx; struct { - uct_ib_mlx5_srq_t srq; - void *pref_ptr; + uct_ib_mlx5_srq_t srq; + void *pref_ptr; } rx; - uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM]; + uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM]; struct { - uct_rc_mlx5_cmd_wq_t cmd_wq; - uct_rc_mlx5_tag_entry_t *head; - uct_rc_mlx5_tag_entry_t *tail; - uct_rc_mlx5_tag_entry_t *list; - ucs_mpool_t *bcopy_mp; - - ucs_ptr_array_t rndv_comps; - size_t max_bcopy; - size_t max_zcopy; - unsigned num_tags; - unsigned num_outstanding; - unsigned max_rndv_data; - uint16_t unexpected_cnt; - uint16_t cmd_qp_len; - uint8_t enabled; + uct_rc_mlx5_cmd_wq_t cmd_wq; + uct_rc_mlx5_tag_entry_t *head; + uct_rc_mlx5_tag_entry_t *tail; + uct_rc_mlx5_tag_entry_t *list; + ucs_mpool_t *bcopy_mp; + khash_t(uct_rc_mlx5_tag_addrs) tag_addrs; + + ucs_ptr_array_t rndv_comps; + size_t max_bcopy; + size_t max_zcopy; + unsigned num_tags; + unsigned num_outstanding; + unsigned max_rndv_data; + uint16_t unexpected_cnt; + uint16_t cmd_qp_len; + uint8_t enabled; struct { - uint8_t num_strides; - ucs_mpool_t tx_mp; - uct_rc_mlx5_mp_context_t last_frag_ctx; + uint8_t num_strides; + ucs_mpool_t tx_mp; + uct_rc_mlx5_mp_context_t last_frag_ctx; khash_t(uct_rc_mlx5_mp_hash_lid) hash_lid; khash_t(uct_rc_mlx5_mp_hash_gid) hash_gid; } mp; struct { - void *arg; /* User defined arg */ - uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */ + void *arg; /* User defined arg */ + uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */ } eager_unexp; struct { - void *arg; /* User defined arg */ - uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */ + void *arg; /* User defined arg */ + uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */ } rndv_unexp; - uct_rc_mlx5_release_desc_t eager_desc; - uct_rc_mlx5_release_desc_t rndv_desc; - uct_rc_mlx5_release_desc_t am_desc; + uct_rc_mlx5_release_desc_t eager_desc; + uct_rc_mlx5_release_desc_t rndv_desc; + uct_rc_mlx5_release_desc_t am_desc; UCS_STATS_NODE_DECLARE(stats) } tm; #if HAVE_IBV_DM struct { - uct_mlx5_dm_data_t *dm; - size_t seg_len; /* cached value to avoid double-pointer access */ - ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, - const void *payload, unsigned length); - ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag, - const void *data, size_t length); + uct_mlx5_dm_data_t *dm; + size_t seg_len; /* cached value to avoid double-pointer access */ + ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr, + const void *payload, unsigned length); + ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag, + const void *data, size_t length); } dm; #endif struct { - uint8_t atomic_fence_flag; - uint8_t put_fence_flag; + uint8_t atomic_fence_flag; + uint8_t put_fence_flag; } config; UCS_STATS_NODE_DECLARE(stats) } uct_rc_mlx5_iface_common_t; diff --git a/test/gtest/ucp/test_ucp_tag_offload.cc b/test/gtest/ucp/test_ucp_tag_offload.cc index de8e59278a5..c3056979999 100644 --- a/test/gtest/ucp/test_ucp_tag_offload.cc +++ b/test/gtest/ucp/test_ucp_tag_offload.cc @@ -493,7 +493,9 @@ class test_ucp_tag_offload_stats : public test_ucp_tag_offload_multi { UCP_TAG_MASK_FULL); // Post and cancel another receive to make sure the first one was offloaded - request *req2 = recv_nb_and_check(buffer, count, DATATYPE, tag, + size_t size = receiver().worker()->context->config.ext.tm_thresh + 1; + std::vector tbuf(size, 0); + request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag, UCP_TAG_MASK_FULL); req_cancel(receiver(), req2); diff --git a/test/gtest/uct/test_tag.cc b/test/gtest/uct/test_tag.cc index 0ec9a521041..82695389860 100644 --- a/test/gtest/uct/test_tag.cc +++ b/test/gtest/uct/test_tag.cc @@ -333,6 +333,7 @@ class test_tag : public uct_test { // Message should be reported as unexpected and filled with // recv seed (unchanged), as the incoming tag does not match the expected check_rx_completion(r_ctx, false, RECV_SEED); + ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1)); flush(); } @@ -650,16 +651,16 @@ UCS_TEST_SKIP_COND_P(test_tag, tag_limit, !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)) { const size_t length = 32; - mapped_buffer recvbuf(length, RECV_SEED, receiver()); ucs::ptr_vector rctxs; - recv_ctx *rctx_p; + ucs::ptr_vector rbufs; ucs_status_t status; do { - // Can use the same recv buffer, as no sends will be issued. - rctx_p = (new recv_ctx()); - init_recv_ctx(*rctx_p, &recvbuf, 1); + recv_ctx *rctx_p = new recv_ctx(); + mapped_buffer *buf_p = new mapped_buffer(length, RECV_SEED, receiver()); + init_recv_ctx(*rctx_p, buf_p, 1); rctxs.push_back(rctx_p); + rbufs.push_back(buf_p); status = tag_post(receiver(), *rctx_p); // Make sure send resources are acknowledged, as we // awaiting for tag space exhaustion. @@ -678,6 +679,51 @@ UCS_TEST_SKIP_COND_P(test_tag, tag_limit, status = tag_post(receiver(), rctxs.at(0)); } while ((ucs_get_time() < deadline) && (status == UCS_ERR_EXCEEDS_LIMIT)); ASSERT_UCS_OK(status); + + // remove posted tags from HW + for (ucs::ptr_vector::const_iterator iter = rctxs.begin(); + iter != rctxs.end() - 1; ++iter) { + ASSERT_UCS_OK(tag_cancel(receiver(), **iter, 1)); + } +} + +UCS_TEST_SKIP_COND_P(test_tag, tag_post_same, + !check_caps(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)) +{ + const size_t length = 128; + mapped_buffer recvbuf(length, RECV_SEED, receiver()); + recv_ctx r_ctx; + init_recv_ctx(r_ctx, &recvbuf, 1); + + ASSERT_UCS_OK(tag_post(receiver(), r_ctx)); + + // Can't post the same buffer until it is completed/cancelled + ucs_status_t status = tag_post(receiver(), r_ctx); + EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS); + + // Cancel with force, should be able to re-post immediately + ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1)); + ASSERT_UCS_OK(tag_post(receiver(), r_ctx)); + + // Cancel without force, should be able to re-post when receive completion + ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 0)); + status = tag_post(receiver(), r_ctx); + EXPECT_EQ(status, UCS_ERR_ALREADY_EXISTS); // no completion yet + + wait_for_flag(&r_ctx.comp); // cancel completed, should be able to post + ASSERT_UCS_OK(tag_post(receiver(), r_ctx)); + + // Now send something to trigger rx completion + init_recv_ctx(r_ctx, &recvbuf, 1); // reinit rx to clear completed states + mapped_buffer sendbuf(length, SEND_SEED, sender()); + send_ctx s_ctx; + init_send_ctx(s_ctx, &sendbuf, 1, reinterpret_cast(&r_ctx)); + ASSERT_UCS_OK(tag_eager_bcopy(sender(), s_ctx)); + + wait_for_flag(&r_ctx.comp); // message consumed, should be able to post + ASSERT_UCS_OK(tag_post(receiver(), r_ctx)); + + ASSERT_UCS_OK(tag_cancel(receiver(), r_ctx, 1)); } UCS_TEST_SKIP_COND_P(test_tag, sw_rndv_expected, From 76adb9f8293ce0057db9d9cfcb2f813d19a938de Mon Sep 17 00:00:00 2001 From: Mikhail Brinskii Date: Tue, 3 Dec 2019 14:58:50 +0200 Subject: [PATCH 2/2] UCT/TAG: Minor styling fixes --- src/uct/ib/rc/accel/rc_mlx5.inl | 2 +- src/uct/ib/rc/accel/rc_mlx5_common.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/uct/ib/rc/accel/rc_mlx5.inl b/src/uct/ib/rc/accel/rc_mlx5.inl index 2808a136276..1334ec195d8 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.inl +++ b/src/uct/ib/rc/accel/rc_mlx5.inl @@ -1022,7 +1022,7 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface, UCT_RC_MLX5_CHECK_TAG(iface); kh_put(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iov->buffer, &ret); - if (ucs_unlikely(!ret)) { + if (ucs_unlikely(ret == 0)) { /* Do not post the same buffer more than once (even with different tags) * to avoid memory corruption. */ return UCS_ERR_ALREADY_EXISTS; diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.c b/src/uct/ib/rc/accel/rc_mlx5_common.c index a396328e12f..2a68736b371 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.c +++ b/src/uct/ib/rc/accel/rc_mlx5_common.c @@ -414,7 +414,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface) { uct_rc_mlx5_mp_hash_key_t key_gid; uint64_t key_lid; - void *tag_addr; + void *recv_buffer; if (!UCT_RC_MLX5_TM_ENABLED(iface)) { return; @@ -426,9 +426,9 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface) ucs_free(iface->tm.cmd_wq.ops); uct_rc_mlx5_tag_cleanup(iface); - kh_foreach_key(&iface->tm.tag_addrs, tag_addr, { - ucs_debug("destroying iface %p, with %p offloaded to the HW", - iface, tag_addr); + kh_foreach_key(&iface->tm.tag_addrs, recv_buffer, { + ucs_debug("destroying iface %p, with recv buffer %p offloaded to the HW", + iface, recv_buffer); }); kh_destroy_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs);