Skip to content

Commit

Permalink
UCP: Strided (Non-contiguous) memory send/recv support (openucx#7/7)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-ma committed Jul 6, 2017
1 parent 14b5ace commit 3a4c78e
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 58 deletions.
4 changes: 4 additions & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ noinst_HEADERS = \
dt/dt.h \
dt/dt_contig.h \
dt/dt_iov.h \
dt/dt_stride.h \
dt/dt_reusable.h \
dt/dt_generic.h \
proto/proto.h \
proto/proto_am.inl \
Expand All @@ -57,6 +59,8 @@ libucp_la_SOURCES = \
core/ucp_worker.c \
dt/dt_contig.c \
dt/dt_iov.c \
dt/dt_stride.c \
dt/dt_reusable.c \
dt/dt_generic.c \
dt/dt.c \
proto/proto_am.c \
Expand Down
6 changes: 3 additions & 3 deletions src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ enum ucp_worker_attr_field {
enum ucp_dt_type {
UCP_DATATYPE_CONTIG = 0, /**< Contiguous datatype */
UCP_DATATYPE_IOV = 2, /**< Scatter-gather list with multiple pointers */
//UCP_DATATYPE_IOV_R = 3, /**< Same as IOV, but reusable */
//UCP_DATATYPE_STRIDE = 4, /**< Interleaving a pointers to strided data */
//UCP_DATATYPE_STRIDE_R = 5, /**< Strided datatype */
UCP_DATATYPE_IOV_R = 3, /**< Same as IOV, but reusable */
UCP_DATATYPE_STRIDE = 4, /**< Interleaving a pointers to strided data */
UCP_DATATYPE_STRIDE_R = 5, /**< Strided datatype */
UCP_DATATYPE_GENERIC = 7, /**< Generic datatype with
user-defined pack/unpack routines */
UCP_DATATYPE_SHIFT = 3, /**< Number of bits defining
Expand Down
56 changes: 55 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,61 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
{
ucp_rsc_index_t mdi = context->tl_rscs[rsc_index].md_index;
uct_md_h uct_md = context->tl_mds[mdi].md;
ucp_dt_reusable_t *reusable;
uct_md_attr_t *uct_md_attr;
size_t iov_it, iovcnt;
const ucp_dt_iov_t *iov;
uct_mem_h *memh;
ucs_status_t status;
size_t extent;

status = UCS_OK;
switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
status = uct_md_mem_reg(uct_md, buffer, length, 0, &state->dt.contig.memh);
break;

case UCP_DATATYPE_STRIDE:
extent = ucp_dt_extent(datatype, state->dt.stride.count, NULL, NULL);
status = uct_md_mem_reg(uct_md, buffer, extent, 0, &state->dt.stride.memh);
break;

case UCP_DATATYPE_STRIDE_R:
/* If this is not the first time - just update the pointers and GO */
reusable = UCP_DT_GET_REUSABLE(datatype);
if (reusable->stride_memh != UCT_MEM_HANDLE_NULL) {
if (ucs_unlikely(reusable->nc_status != UCS_OK)) {
return status;
}

uct_md_attr = &context->tl_mds[mdi].attr;
if (uct_md_attr->cap.flags & UCT_MD_FLAG_REG_NC) {
status = ucp_dt_reusable_update(ep, buffer, length, datatype, state);
state->dt.stride.contig_memh = reusable->nc_memh;
}
state->dt.stride.memh = reusable->stride_memh;
break;
}

/* Map the entire extent of buffers potentially sent */
extent = ucp_dt_extent(datatype, state->dt.stride.count, NULL, NULL);
status = uct_md_mem_reg(uct_md, buffer, extent, 0, &state->dt.stride.memh);

/* If non-contiguous bind is not supported - use the existing mapping */
uct_md_attr = &context->tl_mds[mdi].attr;
if (!(uct_md_attr->cap.flags & UCT_MD_FLAG_REG_NC)) {
break;
}

/* make sure the call to uct_md_mem_reg() succeeded */
if (status != UCS_OK) {
break;
}

status = ucp_dt_reusable_create(ep, buffer, length, datatype, state);
state->dt.stride.contig_memh = reusable->nc_memh;
break;

case UCP_DATATYPE_IOV:
iovcnt = state->dt.iov.iovcnt;
iov = buffer;
Expand All @@ -205,7 +248,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
0, &memh[iov_it]);
if (status != UCS_OK) {
/* unregister previously registered memory */
ucp_multiple_memh_dereg(uct_md, memh, iov_it);
ucp_iov_buffer_memh_dereg(uct_md, memh, iov_it);
ucs_free(memh);
goto err;
}
Expand All @@ -222,6 +265,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_request_memory_reg,
}

status = ucp_dt_reusable_create(ep, buffer, length, datatype, state);
state->dt.iov.contig_memh = reusable->nc_memh;
break;

default:
Expand Down Expand Up @@ -256,6 +300,12 @@ UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg,
}
break;

case UCP_DATATYPE_STRIDE:
if (state->dt.stride.memh != UCT_MEM_HANDLE_NULL) {
uct_md_mem_dereg(uct_md, state->dt.stride.memh);
}
break;

case UCP_DATATYPE_IOV:
memh = state->dt.iov.memh;
for (iov_it = 0; iov_it < state->dt.iov.iovcnt; ++iov_it) {
Expand All @@ -266,6 +316,10 @@ UCS_PROFILE_FUNC_VOID(ucp_request_memory_dereg,
ucs_free(state->dt.iov.memh);
break;

case UCP_DATATYPE_IOV_R:
case UCP_DATATYPE_STRIDE_R:
break;

default:
ucs_error("Invalid data type");
}
Expand Down
27 changes: 26 additions & 1 deletion src/ucp/dt/dt.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src,
state->offset, dest, length);
break;

case UCP_DATATYPE_STRIDE_R:
case UCP_DATATYPE_STRIDE:
dt_ex = ucp_dt_ptr(datatype);
return dt_ex->stride.dim_cnt *
ucp_dt_count_uct_iov(dt_ex->stride.dt, 1, NULL, NULL);

default:
ucs_error("Invalid data type");
}
Expand All @@ -69,7 +75,13 @@ ucp_datatype_t ucp_dt_create(enum ucp_dt_type type, ...)

memset(dt, 0, sizeof(*dt));

if (type == UCP_DATATYPE_GENERIC) {
if ((type == UCP_DATATYPE_STRIDE) ||
(type == UCP_DATATYPE_STRIDE_R)) {
va_list ap;
va_start(ap, type);
ucp_dt_stride_create(&dt->stride, ap);
va_end(ap);
} else if (type == UCP_DATATYPE_GENERIC) {
ucp_generic_dt_ops_t *ops;
void *context;
va_list ap;
Expand Down Expand Up @@ -107,6 +119,19 @@ void ucp_dt_destroy(ucp_datatype_t datatype)
ucs_free(dt_ex);
break;

case UCP_DATATYPE_STRIDE:
dt_ex = ucp_dt_ptr(datatype);
ucs_free(dt_ex);
break;

case UCP_DATATYPE_IOV_R:
case UCP_DATATYPE_STRIDE_R:
dt_ex = ucp_dt_ptr(datatype);
if (dt_ex->reusable.nc_memh != UCT_MEM_HANDLE_NULL) {
ucp_dt_reusable_destroy(&dt_ex->reusable);
}
break;

default:
break;
}
Expand Down
31 changes: 29 additions & 2 deletions src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

#include "dt_contig.h"
#include "dt_iov.h"
#include "dt_stride.h"
#include "dt_generic.h"
#include "dt_reusable.h"

#include <uct/api/uct.h>
#include <ucs/debug/profile.h>
Expand All @@ -24,8 +26,11 @@
* Datatype content, when requiring additional memory allocation.
*/
typedef struct ucp_dt_extended {
ucp_dt_generic_t generic;
/* Temporarily useless - will be extended in future patches */
ucp_dt_reusable_t reusable; /* Must be first, for reusable optimization */
union {
ucp_dt_stride_t stride;
ucp_dt_generic_t generic;
};
} ucp_dt_extended_t;

/**
Expand All @@ -44,6 +49,13 @@ typedef struct ucp_dt_state {
uct_mem_h *memh; /* Pointer to IOV memh[iovcnt] */
uct_mem_h contig_memh; /* For contiguous read/write */
} iov;
struct {
size_t item_offset; /* Offset within a single item */
size_t count; /* Count total strided objects */
size_t dim_index[UCP_DT_STRIDE_MAX_DIMS];
uct_mem_h memh; /* Pointer to inclusive memh */
uct_mem_h contig_memh; /* For contiguous read/write */
} stride;
struct {
void *state;
} generic;
Expand All @@ -67,6 +79,11 @@ static size_t ucp_dt_length_recursive(ucp_datatype_t datatype, size_t count,
case UCP_DATATYPE_CONTIG:
return ucp_contig_dt_length(datatype, count);

case UCP_DATATYPE_IOV_R:
dt_ex = ucp_dt_ptr(datatype);
if (dt_ex->reusable.iov_memh != UCT_MEM_HANDLE_NULL) {
return dt_ex->reusable.length;
}
case UCP_DATATYPE_IOV:
total = 0;
for (iov_it = 0; iov_it < count; ++iov_it) {
Expand All @@ -75,6 +92,16 @@ static size_t ucp_dt_length_recursive(ucp_datatype_t datatype, size_t count,
}
return total;

case UCP_DATATYPE_STRIDE_R:
dt_ex = ucp_dt_ptr(datatype);
if (dt_ex->reusable.stride_memh != UCT_MEM_HANDLE_NULL) {
return dt_ex->reusable.length;
}
case UCP_DATATYPE_STRIDE:
dt_ex = ucp_dt_ptr(datatype);
return count * (is_extent ? dt_ex->stride.total_extent :
dt_ex->stride.total_length);

case UCP_DATATYPE_GENERIC:
dt_ex = ucp_dt_ptr(datatype);
ucs_assert(NULL != state);
Expand Down
42 changes: 0 additions & 42 deletions src/ucp/dt/dt_generic.c

This file was deleted.

9 changes: 5 additions & 4 deletions src/ucp/dt/dt_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ static inline ucp_dt_generic_t* ucp_dt_generic(ucp_datatype_t datatype)
return (ucp_dt_generic_t*)(void*)(datatype & ~UCP_DATATYPE_CLASS_MASK);
}

static inline ucp_dt_generic_t* ucp_dt_generic(ucp_datatype_t datatype)
{
return (ucp_dt_generic_t*)(void*)(datatype & ~UCP_DATATYPE_CLASS_MASK);
}

#define UCP_DT_IS_GENERIC(_datatype) \
(((_datatype) & UCP_DATATYPE_CLASS_MASK) == UCP_DATATYPE_GENERIC)

void ucp_dt_generic_create(ucp_dt_generic_t *dt,
const ucp_generic_dt_ops_t *ops,
void *context);

#endif
20 changes: 20 additions & 0 deletions src/ucp/dt/dt_reusable.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "dt_reusable.h"

#include <ucs/sys/compiler.h>


void ucp_dt_reusable_completion(uct_completion_t *self, ucs_status_t status)
{
ucp_dt_reusable_t *reusable = ucs_container_of(self, ucp_dt_reusable_t, nc_comp);
reusable->nc_status = status;
}

void ucp_dt_reusable_destroy(ucp_dt_reusable_t *reusable) {
uct_md_mem_dereg(reusable->nc_md, reusable->nc_memh);
}
39 changes: 39 additions & 0 deletions src/ucp/dt/dt_reusable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/


#ifndef UCP_DT_REUSABLE_H_
#define UCP_DT_REUSABLE_H_

#include <ucp/api/ucp.h>
#include <uct/api/uct.h>

#define UCP_DT_IS_REUSABLE(_datatype) \
((((_datatype) & UCP_DATATYPE_CLASS_MASK) == UCP_DATATYPE_IOV_R) || \
(((_datatype) & UCP_DATATYPE_CLASS_MASK) == UCP_DATATYPE_STRIDE_R))

#define UCP_DT_GET_REUSABLE(_datatype) (&ucp_dt_ptr(_datatype)->reusable)

typedef struct ucp_dt_reusable {
size_t length; /* Optimization: cache the length of the datatype */
union {
uct_mem_h* iov_memh; /* Array of (UCP-)IOV memory handles */
uct_mem_h stride_memh;/* Handle for the full extent of the stride */
};

uct_mem_h nc_memh; /* Non-contiguous registration - single handle */
uct_md_t *nc_md; /* Memory domain used for creating contig_memh */
uct_completion_t nc_comp; /* Non-contiguous registration completion */
ucs_status_t nc_status; /* Non-contiguous registration status */
uct_iov_t *nc_iov; /* Optimization: cache the registration layout */
size_t nc_iovcnt; /* - and the registration layout length */
} ucp_dt_reusable_t;

void ucp_dt_reusable_destroy(ucp_dt_reusable_t *dt);

void ucp_dt_reusable_completion(uct_completion_t *self, ucs_status_t status);

#endif
Loading

0 comments on commit 3a4c78e

Please sign in to comment.