Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized put with signal implementation as shmemx routines #815

Merged
merged 11 commits into from
Jan 7, 2019
Merged
33 changes: 12 additions & 21 deletions src/data_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,10 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nb(ctx, target, source, \
sizeof(TYPE) * nelems, pe, \
&completion); \
shmem_internal_put_signal(ctx, target, source, \
sizeof(TYPE) * nelems, sig_addr, \
signal, pe, &completion); \
shmem_internal_put_wait(ctx, &completion); \
shmem_internal_put_scalar(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
}


Expand All @@ -497,28 +495,24 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, (SIZE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nb(ctx, target, source, (SIZE) * nelems, \
pe, &completion); \
shmem_internal_put_signal(ctx, target, source, (SIZE) * nelems, \
sig_addr, signal, pe, &completion); \
shmem_internal_put_wait(ctx, &completion); \
shmem_internal_put_scalar(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
}

#define SHMEM_DEF_PUT_SIGNAL_NBI(STYPE,TYPE) \
void SHMEM_FUNCTION_ATTRIBUTES \
SHMEMX_FUNC_PROTOTYPE(STYPE##_put_signal_nbi, TYPE *target, \
const TYPE *source, size_t nelems, \
uint64_t *sig_addr, uint64_t signal, int pe) \
long completion = -1; \
SHMEM_ERR_CHECK_INITIALIZED(); \
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nbi(ctx, target, source, \
sizeof(TYPE) * nelems, pe); \
/* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \
shmem_internal_fence(ctx); \
shmem_internal_put_nbi(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
shmem_internal_put_signal(ctx, target, source, \
sizeof(TYPE) * nelems, sig_addr, \
signal, pe, &completion); \
}


Expand All @@ -527,16 +521,13 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem')
SHMEMX_FUNC_PROTOTYPE(put##NAME##_signal_nbi, void *target, \
const void *source, size_t nelems, \
uint64_t *sig_addr, uint64_t signal, int pe) \
long completion = -1; \
SHMEM_ERR_CHECK_INITIALIZED(); \
SHMEM_ERR_CHECK_PE(pe); \
SHMEM_ERR_CHECK_SYMMETRIC(target, (SIZE) * nelems); \
SHMEM_ERR_CHECK_NULL(source, nelems); \
shmem_internal_put_nbi(ctx, target, source, (SIZE) * nelems, \
pe); \
/* TODO: Uncertainity in terms of the memory model; Need to be fixed later. */ \
shmem_internal_fence(ctx); \
shmem_internal_put_nbi(ctx, sig_addr, &signal, \
sizeof(uint64_t), pe); \
shmem_internal_put_signal(ctx, target, source, (SIZE) * nelems, \
sig_addr, signal, pe, &completion); \
}


Expand Down
34 changes: 34 additions & 0 deletions src/shmem_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,40 @@ shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t
}
}

static inline
void
shmem_internal_put_signal(shmem_ctx_t ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe, long *completion)
jdinan marked this conversation as resolved.
Show resolved Hide resolved
{
int node_rank;

if (len == 0) {
shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good. Just wanted to suggest double checking the proposal that this is the right way to handle the len == 0 case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no specific text added in the proposal for this case. However, in the discussion, it is mentioned that sending the signal is the expected behavior.


if (-1 != (node_rank = SHMEM_GET_RANK_SAME_NODE(pe))) {
#if USE_MEMCPY
memcpy(target, source, len);
shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
#elif USE_XPMEM
shmem_transport_xpmem_put(target, source, len, pe, node_rank);
shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
#elif USE_CMA
if (len > shmem_internal_params.CMA_PUT_MAX) {
shmem_transport_put_signal((shmem_transport_ctx_t *)ctx, target, source, len, sig_addr, signal, pe, completion);
} else {
shmem_transport_cma_put(target, source, len, pe, node_rank);
shmem_internal_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
jdinan marked this conversation as resolved.
Show resolved Hide resolved
}
#else
RAISE_ERROR_STR("No path to peer");
#endif
} else {
shmem_transport_put_signal((shmem_transport_ctx_t *) ctx, target, source, len, sig_addr, signal, pe, completion);
}
}

static inline
void
shmem_internal_put_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe)
Expand Down
8 changes: 8 additions & 0 deletions src/transport_none.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou
RAISE_ERROR_STR("No path to peer");
}

static inline
void
shmem_transport_put_signal(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe, long *completion)
{
RAISE_ERROR_STR("No path to peer");
}

static inline
void
shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion)
Expand Down
132 changes: 132 additions & 0 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,138 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void
}
}

static inline
void shmem_transport_put_signal(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe, long *completion)
{
int ret = 0;
uint64_t dst = (uint64_t) pe;
uint64_t polled = 0;
uint64_t key;
uint8_t *addr;

shmem_transport_ofi_get_mr(target, pe, &addr, &key);

if (len <= shmem_transport_ofi_max_buffered_send) {
uint8_t *src_buf = (uint8_t *) source;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

const struct iovec msg_iov = {
.iov_base = src_buf,
.iov_len = len
};
const struct fi_rma_iov rma_iov = {
.addr = (uint64_t) addr,
.len = len,
.key = key
};
const struct fi_msg_rma msg = {
.msg_iov = &msg_iov,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = src_buf,
.data = 0
};

do {
ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to add the FI_INJECT flag here (this is the buffered send case)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, FI_INJECT should be added here. Thanks.

} while (try_again(ctx, ret, &polled));

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
} else {
uint8_t *frag_source = (uint8_t *) source;
uint64_t frag_target = (uint64_t) addr;
size_t frag_len = len;

struct iovec msg_iov = {
.iov_base = frag_source,
.iov_len = frag_len
};
struct fi_rma_iov rma_iov = {
.addr = frag_target,
.len = frag_len,
.key = key
};
struct fi_msg_rma msg = {
.msg_iov = &msg_iov,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = frag_source,
.data = 0
};

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
while (frag_source < (((uint8_t *) source) + len)) {
frag_len = MIN(shmem_transport_ofi_max_msg_size,
(size_t) (((uint8_t *) source) + len - frag_source));
polled = 0;

msg_iov.iov_base = frag_source;
msg_iov.iov_len = frag_len;

rma_iov.addr = frag_target;
rma_iov.len = frag_len;

msg.msg_iov = &msg_iov;
msg.rma_iov = &rma_iov;
msg.context = frag_source;

SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

do {
ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE);
} while (try_again(ctx, ret, &polled));

frag_source += frag_len;
frag_target += frag_len;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making an observation that when fragmentation occurs, the signal will be signaling completion of multiple prior puts. I re-read the FI_FENCE text and I think this is ok. It might not hurt to toss a comment somewhere to the effect that multiple prior operations may be fenced by the signal put when fragmentation occurs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
if ((*completion) != -1) (*completion)++;
}

/* Transmit the signal */
shmem_transport_ofi_get_mr(sig_addr, pe, &addr, &key);
polled = 0;
ret = 0;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);

const struct iovec msg_iov_signal = {
.iov_base = (uint8_t *) &signal,
.iov_len = sizeof(uint64_t)
};
const struct fi_rma_iov rma_iov_signal = {
.addr = (uint64_t) addr,
.len = sizeof(uint64_t),
.key = key
};
const struct fi_msg_rma msg_signal = {
.msg_iov = &msg_iov_signal,
.desc = NULL,
.iov_count = 1,
.addr = GET_DEST(dst),
.rma_iov = &rma_iov_signal,
.rma_iov_count = 1,
.context = (uint8_t *) &signal,
.data = 0
};

do {
ret = fi_writemsg(ctx->ep, &msg_signal, FI_DELIVERY_COMPLETE | FI_FENCE | FI_INJECT);
jdinan marked this conversation as resolved.
Show resolved Hide resolved
jdinan marked this conversation as resolved.
Show resolved Hide resolved
} while (try_again(ctx, ret, &polled));

SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}

/* compatibility with Portals transport */
static inline
void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) {
Expand Down
24 changes: 24 additions & 0 deletions src/transport_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,30 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void *so
#endif
}

static inline
void shmem_transport_put_signal(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len,
uint64_t *sig_addr, uint64_t signal, int pe, long *completion)
{
if ((*completion) != -1) {
#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING
shmem_transport_portals4_put_nb_internal(ctx, target, source, len, pe,
completion,
shmem_transport_portals4_pt,
-1);
#else
shmem_transport_portals4_put_nb_internal(ctx, target, source, len, pe,
completion,
shmem_transport_portals4_data_pt,
shmem_transport_portals4_heap_pt);
#endif
shmem_transport_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
} else {
shmem_transport_put_nbi(ctx, target, source, len, pe);
shmem_transport_fence(ctx);
shmem_transport_put_scalar(ctx, sig_addr, &signal, sizeof(uint64_t), pe);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a "FIXME" comment and/or file an issue to go back and optimize nonblocking put-with-signal in the Portals transport?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


static inline
void
Expand Down
8 changes: 8 additions & 0 deletions test/shmemx/put_signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,17 @@ int main(int argc, char *argv[])

if (me == 0) {
shmemx_long_put_signal(target, source, MSG_SZ, &sig_addr, 1, dest);
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif
} else {
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif
shmemx_long_put_signal(target, target, MSG_SZ, &sig_addr, 1, dest);
}

Expand Down
4 changes: 4 additions & 0 deletions test/shmemx/put_signal_nbi.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ int main(int argc, char *argv[])
}
}

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
shmem_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#else
shmem_uint64_wait_until(&sig_addr, SHMEM_CMP_EQ, 1);
#endif

for (i = 0; i < MSG_SZ; i++) {
if (target[i] != source[i]) {
Expand Down