Skip to content

Commit

Permalink
Merge pull request #815 from wrrobin/pr/optimized-put-signal
Browse files Browse the repository at this point in the history
Optimized put with signal implementation as shmemx routines
  • Loading branch information
wrrobin authored Jan 7, 2019
2 parents fff5fac + 93e34bd commit 86f37da
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/data_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,9 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nbi(ctx, target, source, \
sizeof(TYPE) * nelems, pe); \
/* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \
shmem_internal_fence(ctx); \
shmem_internal_put_nbi(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
shmem_internal_put_signal_nbi(ctx, target, source, \
sizeof(TYPE) * nelems, sig_addr, \
signal, pe); \
}


Expand All @@ -531,12 +528,8 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, (SIZE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nbi(ctx, target, source, (SIZE) * nelems, \
pe); \
/* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \
shmem_internal_fence(ctx); \
shmem_internal_put_nbi(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
shmem_internal_put_signal_nbi(ctx, target, source, (SIZE) * nelems, \
sig_addr, signal, pe); \
}


Expand Down
38 changes: 38 additions & 0 deletions src/shmem_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "shmem.h"
#include "shmemx.h"

#include "shmem_atomic.h"

#ifdef USE_ON_NODE_COMMS
extern char *shmem_internal_location_array;
#define SHMEM_SET_RANK_SAME_NODE(pe, node_rank) \
Expand Down Expand Up @@ -101,6 +103,42 @@ shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t
}
}

static inline
void
shmem_internal_put_signal_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe)
{
int node_rank;

if (len == 0) {
shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
return;
}

if (-1 != (node_rank = SHMEM_GET_RANK_SAME_NODE(pe))) {
#if USE_MEMCPY
memcpy(target, source, len);
*sig_addr = signal;
#elif USE_XPMEM
shmem_transport_xpmem_put(target, source, len, pe, node_rank);
shmem_internal_membar_store(); /* Memory fence to ensure target PE observes stores in the correct order */
shmem_transport_xpmem_put(sig_addr, &signal, sizeof(uint64_t), pe, node_rank);
#elif USE_CMA
if (len > shmem_internal_params.CMA_PUT_MAX) {
shmem_transport_put_signal_nbi((shmem_transport_ctx_t *)ctx, target, source, len, sig_addr, signal, pe);
} else {
shmem_transport_cma_put(target, source, len, pe, node_rank);
shmem_internal_membar_store(); /* Memory fence to ensure target PE observes stores in the correct order */
shmem_transport_cma_put(sig_addr, &signal, sizeof(uint64_t), pe, node_rank);
}
#else
RAISE_ERROR_STR("No path to peer");
#endif
} else {
shmem_transport_put_signal_nbi((shmem_transport_ctx_t *) ctx, target, source, len, sig_addr, signal, pe);
}
}

static inline
void
shmem_internal_put_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe)
Expand Down
8 changes: 8 additions & 0 deletions src/transport_none.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou
RAISE_ERROR_STR("No path to peer");
}

static inline
void
shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe)
{
RAISE_ERROR_STR("No path to peer");
}

static inline
void
shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion)
Expand Down
135 changes: 135 additions & 0 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,141 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void
}
}

static inline
void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe)
{
int ret = 0;
uint64_t dst = (uint64_t) pe;
uint64_t polled = 0;
uint64_t key;
uint8_t *addr;

shmem_transport_ofi_get_mr(target, pe, &addr, &key);

if (len <= shmem_transport_ofi_max_buffered_send) {
uint8_t *src_buf = (uint8_t *) source;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

const struct iovec msg_iov = {
.iov_base = src_buf,
.iov_len = len
};
const struct fi_rma_iov rma_iov = {
.addr = (uint64_t) addr,
.len = len,
.key = key
};
const struct fi_msg_rma msg = {
.msg_iov = &msg_iov,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = src_buf,
.data = 0
};

do {
ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE | FI_INJECT);
} while (try_again(ctx, ret, &polled));

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
} else {
uint8_t *frag_source = (uint8_t *) source;
uint64_t frag_target = (uint64_t) addr;
size_t frag_len = len;

struct iovec msg_iov = {
.iov_base = frag_source,
.iov_len = frag_len
};
struct fi_rma_iov rma_iov = {
.addr = frag_target,
.len = frag_len,
.key = key
};
struct fi_msg_rma msg = {
.msg_iov = &msg_iov,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = frag_source,
.data = 0
};

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
while (frag_source < (((uint8_t *) source) + len)) {
frag_len = MIN(shmem_transport_ofi_max_msg_size,
(size_t) (((uint8_t *) source) + len - frag_source));
polled = 0;

msg_iov.iov_base = frag_source;
msg_iov.iov_len = frag_len;

rma_iov.addr = frag_target;
rma_iov.len = frag_len;

msg.msg_iov = &msg_iov;
msg.rma_iov = &rma_iov;
msg.context = frag_source;

SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

do {
ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE);
} while (try_again(ctx, ret, &polled));

frag_source += frag_len;
frag_target += frag_len;
}
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}

/* Transmit the signal */
shmem_transport_ofi_get_mr(sig_addr, pe, &addr, &key);
polled = 0;
ret = 0;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

const struct fi_ioc msg_iov_signal = {
.addr = (uint8_t *) &signal,
.count = 1
};
const struct fi_rma_ioc rma_iov_signal = {
.addr = (uint64_t) addr,
.count = 1,
.key = key
};
const struct fi_msg_atomic msg_signal = {
.msg_iov = &msg_iov_signal,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov_signal,
.rma_iov_count = 1,
.datatype = FI_UINT64,
.op = FI_ATOMIC_WRITE,
.context = (uint8_t *) &signal,
.data = 0
};

do {
/* FI_FENCE assures completion of one or more (for fragmentation) prior puts through
* signal delivery */
ret = fi_atomicmsg(ctx->ep, &msg_signal, FI_DELIVERY_COMPLETE | FI_FENCE | FI_INJECT);
} while (try_again(ctx, ret, &polled));

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}

/* compatibility with Portals transport */
static inline
void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) {
Expand Down
11 changes: 11 additions & 0 deletions src/transport_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,17 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void *so
#endif
}

static inline
void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe)
{
/* FIXME: Need to optimize non-blocking put with signal for Portals. Current implementation below keeps
* the "fence" in between data and signal put */
shmem_transport_put_nbi(ctx, target, source, len, pe);
shmem_transport_fence(ctx);
shmem_transport_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
}


static inline
void
Expand Down
8 changes: 8 additions & 0 deletions test/shmemx/put_signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,17 @@ int main(int argc, char *argv[])

if (me == 0) {
shmemx_long_put_signal(target, source, MSG_SZ, &sig_addr, 1, dest);
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif
} else {
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif
shmemx_long_put_signal(target, target, MSG_SZ, &sig_addr, 1, dest);
}

Expand Down
4 changes: 4 additions & 0 deletions test/shmemx/put_signal_nbi.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ int main(int argc, char *argv[])
}
}

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif

for (i = 0; i < MSG_SZ; i++) {
if (target[i] != source[i]) {
Expand Down

0 comments on commit 86f37da

Please sign in to comment.