diff --git a/src/data_c.c4 b/src/data_c.c4 index 7a894cba8..ec56c8e74 100644 --- a/src/data_c.c4 +++ b/src/data_c.c4 @@ -513,12 +513,9 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_PE(pe); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE) * nelems); \ SHMEM_ERR_CHECK_NULL(source, nelems); \ - shmem_internal_put_nbi(ctx, target, source, \ - sizeof(TYPE) * nelems, pe); \ - /* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \ - shmem_internal_fence(ctx); \ - shmem_internal_put_nbi(ctx, sig_addr, &signal, \ - sizeof(uint64_t), pe); \ + shmem_internal_put_signal_nbi(ctx, target, source, \ + sizeof(TYPE) * nelems, sig_addr, \ + signal, pe); \ } @@ -531,12 +528,8 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_PE(pe); \ SHMEM_ERR_CHECK_SYMMETRIC(target, (SIZE) * nelems); \ SHMEM_ERR_CHECK_NULL(source, nelems); \ - shmem_internal_put_nbi(ctx, target, source, (SIZE) * nelems, \ - pe); \ - /* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \ - shmem_internal_fence(ctx); \ - shmem_internal_put_nbi(ctx, sig_addr, &signal, \ - sizeof(uint64_t), pe); \ + shmem_internal_put_signal_nbi(ctx, target, source, (SIZE) * nelems, \ + sig_addr, signal, pe); \ } diff --git a/src/shmem_comm.h b/src/shmem_comm.h index 21623f1d7..72b46aae3 100644 --- a/src/shmem_comm.h +++ b/src/shmem_comm.h @@ -25,6 +25,8 @@ #include "shmem.h" #include "shmemx.h" +#include "shmem_atomic.h" + #ifdef USE_ON_NODE_COMMS extern char *shmem_internal_location_array; #define SHMEM_SET_RANK_SAME_NODE(pe, node_rank) \ @@ -101,6 +103,42 @@ shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t } } +static inline +void +shmem_internal_put_signal_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, + uint64_t *sig_addr, uint64_t signal, int pe) +{ + int node_rank; + + if (len == 0) { + shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe); + return; + } + + if (-1 != (node_rank = SHMEM_GET_RANK_SAME_NODE(pe))) { +#if USE_MEMCPY + memcpy(target, source, len); + *sig_addr = signal; +#elif USE_XPMEM + shmem_transport_xpmem_put(target, source, len, pe, node_rank); + shmem_internal_membar_store(); /* Memory fence to ensure target PE observes stores in the correct order */ + shmem_transport_xpmem_put(sig_addr, &signal, sizeof(uint64_t), pe, node_rank); +#elif USE_CMA + if (len > shmem_internal_params.CMA_PUT_MAX) { + shmem_transport_put_signal_nbi((shmem_transport_ctx_t *)ctx, target, source, len, sig_addr, signal, pe); + } else { + shmem_transport_cma_put(target, source, len, pe, node_rank); + shmem_internal_membar_store(); /* Memory fence to ensure target PE observes stores in the correct order */ + shmem_transport_cma_put(sig_addr, &signal, sizeof(uint64_t), pe, node_rank); + } +#else + RAISE_ERROR_STR("No path to peer"); +#endif + } else { + shmem_transport_put_signal_nbi((shmem_transport_ctx_t *) ctx, target, source, len, sig_addr, signal, pe); + } +} + static inline void shmem_internal_put_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe) diff --git a/src/transport_none.h b/src/transport_none.h index 87eb059f5..2d8aaa47b 100644 --- a/src/transport_none.h +++ b/src/transport_none.h @@ -134,6 +134,14 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou RAISE_ERROR_STR("No path to peer"); } +static inline +void +shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, + uint64_t *sig_addr, uint64_t signal, int pe) +{ + RAISE_ERROR_STR("No path to peer"); +} + static inline void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) diff --git a/src/transport_ofi.h b/src/transport_ofi.h index 0c6d7a414..93f4ac6b6 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -658,6 +658,141 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void } } +static inline +void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, + uint64_t *sig_addr, uint64_t signal, int pe) +{ + int ret = 0; + uint64_t dst = (uint64_t) pe; + uint64_t polled = 0; + uint64_t key; + uint8_t *addr; + + shmem_transport_ofi_get_mr(target, pe, &addr, &key); + + if (len <= shmem_transport_ofi_max_buffered_send) { + uint8_t *src_buf = (uint8_t *) source; + + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + + const struct iovec msg_iov = { + .iov_base = src_buf, + .iov_len = len + }; + const struct fi_rma_iov rma_iov = { + .addr = (uint64_t) addr, + .len = len, + .key = key + }; + const struct fi_msg_rma msg = { + .msg_iov = &msg_iov, + .desc = NULL, + .iov_count = 1, + .addr = GET_DEST(dst), + .rma_iov = &rma_iov, + .rma_iov_count = 1, + .context = src_buf, + .data = 0 + }; + + do { + ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE | FI_INJECT); + } while (try_again(ctx, ret, &polled)); + + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); + } else { + uint8_t *frag_source = (uint8_t *) source; + uint64_t frag_target = (uint64_t) addr; + size_t frag_len = len; + + struct iovec msg_iov = { + .iov_base = frag_source, + .iov_len = frag_len + }; + struct fi_rma_iov rma_iov = { + .addr = frag_target, + .len = frag_len, + .key = key + }; + struct fi_msg_rma msg = { + .msg_iov = &msg_iov, + .desc = NULL, + .iov_count = 1, + .addr = GET_DEST(dst), + .rma_iov = &rma_iov, + .rma_iov_count = 1, + .context = frag_source, + .data = 0 + }; + + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + while (frag_source < (((uint8_t *) source) + len)) { + frag_len = MIN(shmem_transport_ofi_max_msg_size, + (size_t) (((uint8_t *) source) + len - frag_source)); + polled = 0; + + msg_iov.iov_base = frag_source; + msg_iov.iov_len = frag_len; + + rma_iov.addr = frag_target; + rma_iov.len = frag_len; + + msg.msg_iov = &msg_iov; + msg.rma_iov = &rma_iov; + msg.context = frag_source; + + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + + do { + ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE); + } while (try_again(ctx, ret, &polled)); + + frag_source += frag_len; + frag_target += frag_len; + } + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); + } + + /* Transmit the signal */ + shmem_transport_ofi_get_mr(sig_addr, pe, &addr, &key); + polled = 0; + ret = 0; + + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + + const struct fi_ioc msg_iov_signal = { + .addr = (uint8_t *) &signal, + .count = 1 + }; + const struct fi_rma_ioc rma_iov_signal = { + .addr = (uint64_t) addr, + .count = 1, + .key = key + }; + const struct fi_msg_atomic msg_signal = { + .msg_iov = &msg_iov_signal, + .desc = NULL, + .iov_count = 1, + .addr = GET_DEST(dst), + .rma_iov = &rma_iov_signal, + .rma_iov_count = 1, + .datatype = FI_UINT64, + .op = FI_ATOMIC_WRITE, + .context = (uint8_t *) &signal, + .data = 0 + }; + + do { + /* FI_FENCE assures completion of one or more (for fragmentation) prior puts through + * signal delivery */ + ret = fi_atomicmsg(ctx->ep, &msg_signal, FI_DELIVERY_COMPLETE | FI_FENCE | FI_INJECT); + } while (try_again(ctx, ret, &polled)); + + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); +} + /* compatibility with Portals transport */ static inline void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) { diff --git a/src/transport_portals4.h b/src/transport_portals4.h index b9bf20683..ed377d2cd 100644 --- a/src/transport_portals4.h +++ b/src/transport_portals4.h @@ -643,6 +643,17 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void *so #endif } +static inline +void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, + uint64_t *sig_addr, uint64_t signal, int pe) +{ + /* FIXME: Need to optimize non-blocking put with signal for Portals. Current implementation below keeps + * the "fence" in between data and signal put */ + shmem_transport_put_nbi(ctx, target, source, len, pe); + shmem_transport_fence(ctx); + shmem_transport_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe); +} + static inline void diff --git a/test/shmemx/put_signal.c b/test/shmemx/put_signal.c index 0655386f8..8c4791e8c 100644 --- a/test/shmemx/put_signal.c +++ b/test/shmemx/put_signal.c @@ -65,9 +65,17 @@ int main(int argc, char *argv[]) if (me == 0) { shmemx_long_put_signal(target, source, MSG_SZ, &sig_addr, 1, dest); +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#else + shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#endif } else { +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#else + shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#endif shmemx_long_put_signal(target, target, MSG_SZ, &sig_addr, 1, dest); } diff --git a/test/shmemx/put_signal_nbi.c b/test/shmemx/put_signal_nbi.c index 6d9abc1e7..4d3b94412 100644 --- a/test/shmemx/put_signal_nbi.c +++ b/test/shmemx/put_signal_nbi.c @@ -67,7 +67,11 @@ int main(int argc, char *argv[]) } } +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#else + shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1); +#endif for (i = 0; i < MSG_SZ; i++) { if (target[i] != source[i]) {