Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/openucx/ucx into topic/do…
Browse files Browse the repository at this point in the history
…c-fix-to-conventions
  • Loading branch information
bbenton committed Dec 4, 2019
2 parents 5447b3d + 99c7a73 commit 5d2068e
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 51 deletions.
5 changes: 3 additions & 2 deletions src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ ucp_tag_offload_do_post(ucp_request_t *req)
req->recv.tag.tag_mask, &iov, 1,
&req->recv.uct_ctx);
if (status != UCS_OK) {
ucs_assert((status == UCS_ERR_NO_RESOURCE) ||
(status == UCS_ERR_EXCEEDS_LIMIT));
ucs_assert((status == UCS_ERR_NO_RESOURCE) ||
(status == UCS_ERR_EXCEEDS_LIMIT) ||
(status == UCS_ERR_ALREADY_EXISTS));
/* No more matching entries in the transport.
* TODO keep registration in case SW RNDV protocol will be used */
ucp_tag_offload_release_buf(req, 1);
Expand Down
104 changes: 102 additions & 2 deletions src/uct/ib/base/ib_md.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,36 +141,136 @@ typedef struct uct_ib_md_config {
unsigned devx_objs; /**< Objects to be created by DevX */
} uct_ib_md_config_t;


/**
* Memory domain constructor.
*
* @param [in] ibv_device IB device.
*
* @param [in] md_config Memory domain configuration parameters.
*
* @param [out] md_p Handle to memory domain.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_open_func_t)(struct ibv_device *ibv_device,
const uct_ib_md_config_t *md_config,
struct uct_ib_md **p_md);
struct uct_ib_md **md_p);

/**
* Memory domain destructor.
*
* @param [in] md Memory domain.
*/
typedef void (*uct_ib_md_cleanup_func_t)(struct uct_ib_md *);

/**
* Memory domain method to register memory area.
*
* @param [in] md Memory domain.
*
* @param [in] address Memory area start address.
*
* @param [in] length Memory area length.
*
* @param [in] access IB verbs registration access flags
*
* @param [in] memh Memory region handle.
* Method should initialize lkey & rkey.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_reg_key_func_t)(struct uct_ib_md *md,
void *address, size_t length,
uint64_t access,
uct_ib_mem_t *memh);

/**
* Memory domain method to deregister memory area.
*
* @param [in] md Memory domain.
*
* @param [in] memh Memory region handle registered with
* uct_ib_md_reg_key_func_t.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_dereg_key_func_t)(struct uct_ib_md *md,
uct_ib_mem_t *memh);

/**
* Memory domain method to register memory area optimized for atomic ops.
*
* @param [in] md Memory domain.
*
* @param [in] memh Memory region handle registered for regular ops.
* Method should initialize atomic_rkey
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_reg_atomic_key_func_t)(struct uct_ib_md *md,
uct_ib_mem_t *memh);

/**
* Memory domain method to release resources registered for atomic ops.
*
* @param [in] md Memory domain.
*
* @param [in] memh Memory region handle registered with
* uct_ib_md_reg_atomic_key_func_t.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_dereg_atomic_key_func_t)(struct uct_ib_md *md,
uct_ib_mem_t *memh);

/**
* Memory domain method to register memory area using multiple threads.
*
* @param [in] md Memory domain.
*
* @param [in] address Memory area start address.
*
* @param [in] length Memory area length.
*
* @param [in] access IB verbs registration access flags
*
* @param [in] memh Memory region handle.
* Method should initialize lkey & rkey.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_reg_multithreaded_func_t)(uct_ib_md_t *md,
void *address,
size_t length,
uint64_t access,
uct_ib_mem_t *memh);

/**
* Memory domain method to deregister memory area.
*
* @param [in] md Memory domain.
*
* @param [in] memh Memory region handle registered with
* uct_ib_md_reg_key_func_t.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_dereg_multithreaded_func_t)(uct_ib_md_t *md,
uct_ib_mem_t *memh);

/**
* Memory domain method to prefetch physical memory for virtual memory area.
*
* @param [in] md Memory domain.
*
* @param [in] memh Memory region handle.
*
* @param [in] address Memory area start address.
*
* @param [in] length Memory area length.
*
* @return UCS_OK on success or error code in case of failure.
*/
typedef ucs_status_t (*uct_ib_md_mem_prefetch_func_t)(uct_ib_md_t *md,
uct_ib_mem_t *memh,
void *addr, size_t length);
Expand Down
23 changes: 23 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5.inl
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,19 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface,
uct_rc_mlx5_tag_entry_t *tag_entry;
uint16_t next_idx;
unsigned ctrl_size;
int ret;

UCT_CHECK_IOV_SIZE(iovcnt, 1ul, "uct_rc_mlx5_iface_common_tag_recv");
UCT_RC_MLX5_CHECK_TAG(iface);

kh_put(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iov->buffer, &ret);
if (ucs_unlikely(ret == 0)) {
/* Do not post the same buffer more than once (even with different tags)
* to avoid memory corruption. */
return UCS_ERR_ALREADY_EXISTS;
}
ucs_assert(ret > 0);

ctrl_size = sizeof(struct mlx5_wqe_ctrl_seg) +
sizeof(uct_rc_mlx5_wqe_tm_seg_t);
tag_entry = iface->tm.head;
Expand Down Expand Up @@ -1053,6 +1062,17 @@ uct_rc_mlx5_iface_common_tag_recv(uct_rc_mlx5_iface_common_t *iface,
return UCS_OK;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_iface_tag_del_from_hash(uct_rc_mlx5_iface_common_t *iface,
void *buffer)
{
khiter_t iter;

iter = kh_get(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, buffer);
ucs_assert(iter != kh_end(&iface->tm.tag_addrs));
kh_del(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs, iter);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface,
uct_tag_context_t *ctx, int force)
Expand All @@ -1067,6 +1087,7 @@ uct_rc_mlx5_iface_common_tag_recv_cancel(uct_rc_mlx5_iface_common_t *iface,
if (ucs_likely(force)) {
flags = UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT;
uct_rc_mlx5_release_tag_entry(iface, tag_entry);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);
} else {
flags = UCT_RC_MLX5_SRQ_FLAG_TM_CQE_REQ | UCT_RC_MLX5_SRQ_FLAG_TM_SW_CNT;
uct_rc_mlx5_add_cmd_wq_op(iface, tag_entry);
Expand Down Expand Up @@ -1097,6 +1118,7 @@ uct_rc_mlx5_iface_handle_tm_list_op(uct_rc_mlx5_iface_common_t *iface, int opcod
if (opcode == UCT_RC_MLX5_CQE_APP_OP_TM_REMOVE) {
ctx = op->tag->ctx;
priv = uct_rc_mlx5_ctx_priv(ctx);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);
ctx->completed_cb(ctx, priv->tag, 0, priv->length, UCS_ERR_CANCELED);
}
}
Expand Down Expand Up @@ -1142,6 +1164,7 @@ uct_rc_mlx5_iface_handle_expected(uct_rc_mlx5_iface_common_t *iface, struct mlx5
byte_len = ntohl(cqe->byte_cnt);

uct_rc_mlx5_release_tag_entry(iface, tag_entry);
uct_rc_mlx5_iface_tag_del_from_hash(iface, priv->buffer);

if (cqe->op_own & MLX5_INLINE_SCATTER_64) {
ucs_assert(byte_len <= priv->length);
Expand Down
12 changes: 12 additions & 0 deletions src/uct/ib/rc/accel/rc_mlx5_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
{
uct_rc_mlx5_mp_hash_key_t key_gid;
uint64_t key_lid;
void *recv_buffer;

if (!UCT_RC_MLX5_TM_ENABLED(iface)) {
return;
Expand All @@ -425,6 +426,12 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
ucs_free(iface->tm.cmd_wq.ops);
uct_rc_mlx5_tag_cleanup(iface);

kh_foreach_key(&iface->tm.tag_addrs, recv_buffer, {
ucs_debug("destroying iface %p, with recv buffer %p offloaded to the HW",
iface, recv_buffer);
});
kh_destroy_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs);

if (!UCT_RC_MLX5_MP_ENABLED(iface)) {
return;
}
Expand All @@ -440,6 +447,7 @@ void uct_rc_mlx5_iface_common_tag_cleanup(uct_rc_mlx5_iface_common_t *iface)
iface, key_gid.guid, key_gid.qp_num);
});
kh_destroy_inplace(uct_rc_mlx5_mp_hash_gid, &iface->tm.mp.hash_gid);

ucs_mpool_cleanup(&iface->tm.mp.tx_mp, 1);
}

Expand Down Expand Up @@ -734,6 +742,10 @@ void uct_rc_mlx5_init_rx_tm_common(uct_rc_mlx5_iface_common_t *iface,
* ptr_array is used as operation ID and is passed in "app_context"
* of TM header. */
ucs_ptr_array_init(&iface->tm.rndv_comps, 0, "rm_rndv_completions");

/* Set of addresses posted to the HW. Used to avoid posting of the same
* address more than once. */
kh_init_inplace(uct_rc_mlx5_tag_addrs, &iface->tm.tag_addrs);
}

ucs_status_t uct_rc_mlx5_init_rx_tm(uct_rc_mlx5_iface_common_t *iface,
Expand Down
85 changes: 45 additions & 40 deletions src/uct/ib/rc/accel/rc_mlx5_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,68 +354,73 @@ typedef union uct_rc_mlx5_dm_copy_data {
} UCS_S_PACKED uct_rc_mlx5_dm_copy_data_t;
#endif

#define uct_rc_mlx5_tag_addr_hash(_ptr) kh_int64_hash_func((uintptr_t)(_ptr))
KHASH_INIT(uct_rc_mlx5_tag_addrs, void*, char, 0, uct_rc_mlx5_tag_addr_hash,
kh_int64_hash_equal)

typedef struct uct_rc_mlx5_iface_common {
uct_rc_iface_t super;
uct_rc_iface_t super;
struct {
ucs_mpool_t atomic_desc_mp;
uct_ib_mlx5_mmio_mode_t mmio_mode;
uint16_t bb_max; /* limit number of outstanding WQE BBs */
ucs_mpool_t atomic_desc_mp;
uct_ib_mlx5_mmio_mode_t mmio_mode;
uint16_t bb_max; /* limit number of outstanding WQE BBs */
} tx;
struct {
uct_ib_mlx5_srq_t srq;
void *pref_ptr;
uct_ib_mlx5_srq_t srq;
void *pref_ptr;
} rx;
uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM];
uct_ib_mlx5_cq_t cq[UCT_IB_DIR_NUM];
struct {
uct_rc_mlx5_cmd_wq_t cmd_wq;
uct_rc_mlx5_tag_entry_t *head;
uct_rc_mlx5_tag_entry_t *tail;
uct_rc_mlx5_tag_entry_t *list;
ucs_mpool_t *bcopy_mp;

ucs_ptr_array_t rndv_comps;
size_t max_bcopy;
size_t max_zcopy;
unsigned num_tags;
unsigned num_outstanding;
unsigned max_rndv_data;
uint16_t unexpected_cnt;
uint16_t cmd_qp_len;
uint8_t enabled;
uct_rc_mlx5_cmd_wq_t cmd_wq;
uct_rc_mlx5_tag_entry_t *head;
uct_rc_mlx5_tag_entry_t *tail;
uct_rc_mlx5_tag_entry_t *list;
ucs_mpool_t *bcopy_mp;
khash_t(uct_rc_mlx5_tag_addrs) tag_addrs;

ucs_ptr_array_t rndv_comps;
size_t max_bcopy;
size_t max_zcopy;
unsigned num_tags;
unsigned num_outstanding;
unsigned max_rndv_data;
uint16_t unexpected_cnt;
uint16_t cmd_qp_len;
uint8_t enabled;
struct {
uint8_t num_strides;
ucs_mpool_t tx_mp;
uct_rc_mlx5_mp_context_t last_frag_ctx;
uint8_t num_strides;
ucs_mpool_t tx_mp;
uct_rc_mlx5_mp_context_t last_frag_ctx;
khash_t(uct_rc_mlx5_mp_hash_lid) hash_lid;
khash_t(uct_rc_mlx5_mp_hash_gid) hash_gid;
} mp;
struct {
void *arg; /* User defined arg */
uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */
void *arg; /* User defined arg */
uct_tag_unexp_eager_cb_t cb; /* Callback for unexpected eager messages */
} eager_unexp;

struct {
void *arg; /* User defined arg */
uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */
void *arg; /* User defined arg */
uct_tag_unexp_rndv_cb_t cb; /* Callback for unexpected rndv messages */
} rndv_unexp;
uct_rc_mlx5_release_desc_t eager_desc;
uct_rc_mlx5_release_desc_t rndv_desc;
uct_rc_mlx5_release_desc_t am_desc;
uct_rc_mlx5_release_desc_t eager_desc;
uct_rc_mlx5_release_desc_t rndv_desc;
uct_rc_mlx5_release_desc_t am_desc;
UCS_STATS_NODE_DECLARE(stats)
} tm;
#if HAVE_IBV_DM
struct {
uct_mlx5_dm_data_t *dm;
size_t seg_len; /* cached value to avoid double-pointer access */
ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length);
ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length);
uct_mlx5_dm_data_t *dm;
size_t seg_len; /* cached value to avoid double-pointer access */
ucs_status_t (*am_short)(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length);
ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length);
} dm;
#endif
struct {
uint8_t atomic_fence_flag;
uint8_t put_fence_flag;
uint8_t atomic_fence_flag;
uint8_t put_fence_flag;
} config;
UCS_STATS_NODE_DECLARE(stats)
} uct_rc_mlx5_iface_common_t;
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/mm/posix/mm_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ uct_posix_mem_alloc(uct_md_h tl_md, size_t *length_p, void **address_p,

err_close:
close(fd);
if (!posix_config->use_proc_link) {
if (!(seg->seg_id & UCT_POSIX_SEG_FLAG_PROCFS)) {
uct_posix_unlink(md, seg->seg_id);
}
err_free_seg:
Expand Down
4 changes: 3 additions & 1 deletion test/gtest/ucp/test_ucp_tag_offload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ class test_ucp_tag_offload_stats : public test_ucp_tag_offload_multi {
UCP_TAG_MASK_FULL);

// Post and cancel another receive to make sure the first one was offloaded
request *req2 = recv_nb_and_check(buffer, count, DATATYPE, tag,
size_t size = receiver().worker()->context->config.ext.tm_thresh + 1;
std::vector<char> tbuf(size, 0);
request *req2 = recv_nb_and_check(&tbuf[0], size, DATATYPE, tag,
UCP_TAG_MASK_FULL);
req_cancel(receiver(), req2);

Expand Down
Loading

0 comments on commit 5d2068e

Please sign in to comment.