Skip to content

Commit

Permalink
change openucx#16
Browse files Browse the repository at this point in the history
  • Loading branch information
andypauloramirez authored Aug 17, 2022
1 parent f4d507d commit 5a315d6
Showing 1 changed file with 1 addition and 239 deletions.
240 changes: 1 addition & 239 deletions examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,6 @@ typedef struct test_req {
int complete;
} test_req_t;


/**
* Descriptor of the data received with AM API.
*/
/**
*static struct {
* volatile int complete;
* int is_rndv;
* void *desc;
* void *recv_buf;
} **/

/* am_data_desc = {0, 0, NULL, NULL}; ----------MODIFIED AM------------*/


/**
* Print this application's usage help message.
*/
Expand Down Expand Up @@ -160,32 +145,6 @@ static void tag_recv_cb(void *request, ucs_status_t status,
common_cb(user_data, "tag_recv_cb");
}

/**
* The callback on the receiving side, which is invoked upon receiving the
* stream message.
*/
/**--------------------------MODIFIED-----------------------------------------------
*static void stream_recv_cb(void *request, ucs_status_t status, size_t length,
* void *user_data)
*{
* common_cb(user_data, "stream_recv_cb");
*}
**/

/*-------------------------MODIFIED------------------------------------------------*/

/**
* The callback on the receiving side, which is invoked upon receiving the
* active message.
**/
/** --------------MODIFIED AM -----------------------------------
*static void am_recv_cb(void *request, ucs_status_t status, size_t length,
* void *user_data)
*{
* common_cb(user_data, "am_recv_cb");
*}
* -------------------MODIFIED AM-------------------------------------
**/
/**
* The callback on the sending side, which is invoked after finishing sending
* the message.
Expand Down Expand Up @@ -401,48 +360,6 @@ fill_request_param(ucp_dt_iov_t *iov, int is_client,
return 0;
}

/**
* Send and receive a message using the Stream API.
* The client sends a message to the server and waits until the send it completed.
* The server receives a message from the client and waits for its completion.
*/

/**static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
* int current_iter)
*{
* ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
* ucp_request_param_t param;
* test_req_t *request;
* size_t msg_length;
* void *msg;
* test_req_t ctx;
* memset(iov, 0, iov_cnt * sizeof(*iov));
* if (fill_request_param(iov, !is_server, &msg, &msg_length,
* &ctx, &param) != 0) {
* return -1;
* }
* if (!is_server) {
* Client sends a message to the server using the stream API *
*
* param.cb.send = send_cb;
* request = ucp_stream_send_nbx(ep, msg, msg_length, &param);
* } else {
* Server receives a message from the client using the stream API **/
/** param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS;
* param.flags = UCP_STREAM_RECV_FLAG_WAITALL;
* param.cb.recv_stream = stream_recv_cb; -------------MODIFIED-------------------
* request = ucp_stream_recv_nbx(ep, msg, msg_length,
* &msg_length, &param);
* }
* return request_finalize(ucp_worker, request, &ctx, is_server, iov,
* current_iter);
*}
**/

/**
* Send and receive a message using the Tag-Matching API.
* The client sends a message to the server and waits until the send it completed.
Expand Down Expand Up @@ -480,125 +397,6 @@ static int send_recv_tag(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
current_iter);
}

/** --------------------------MODIFIED AM-----------------------------------------
*ucs_status_t ucp_am_data_cb(void *arg, const void *header, size_t header_length,
* void *data, size_t length,
* const ucp_am_recv_param_t *param)
*{
* ucp_dt_iov_t *iov;
* size_t idx;
* size_t offset;
* if (length != iov_cnt * test_string_length) {
* fprintf(stderr, "received wrong data length %ld (expected %ld)",
* length, iov_cnt * test_string_length);
* return UCS_OK;
* }
* if (header_length != 0) {
* fprintf(stderr, "received unexpected header, length %ld", header_length);
}
**/
/* am_data_desc.complete = 1; */

/* if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) { */
/** Rendezvous request arrived, data contains an internal UCX descriptor,
* which has to be passed to ucp_am_recv_data_nbx function to confirm
* data transfer.
**/
/**
* am_data_desc.is_rndv = 1;
* am_data_desc.desc = data;
* return UCS_INPROGRESS;
* }
**/
/* Message delivered with eager protocol, data should be available
* immediately
*/
/* am_data_desc.is_rndv = 0; */

/**
* iov = am_data_desc.recv_buf;
* offset = 0;
* for (idx = 0; idx < iov_cnt; idx++) {
* mem_type_memcpy(iov[idx].buffer, UCS_PTR_BYTE_OFFSET(data, offset),
* iov[idx].length);
* offset += iov[idx].length;
* }
* return UCS_OK;
*}
**/


/**
* Send and receive a message using Active Message API.
* The client sends a message to the server and waits until the send is completed.
* The server gets a message from the client and if it is rendezvous request,
* initiates receive operation.
**/

/**
*static int send_recv_am(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
* int current_iter)
*{
* ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
* test_req_t *request;
* ucp_request_param_t params;
* size_t msg_length;
* void *msg;
* test_req_t ctx;
* memset(iov, 0, iov_cnt * sizeof(*iov));
* if (fill_request_param(iov, !is_server, &msg, &msg_length,
* &ctx, &params) != 0) {
* return -1;
* }
* if (is_server) {
* am_data_desc.recv_buf = iov;
**/
/* waiting for AM callback has called */
/**
* while (!am_data_desc.complete) {
* ucp_worker_progress(ucp_worker);
* }
* am_data_desc.complete = 0;
* if (am_data_desc.is_rndv) {
**/
/* Rendezvous request has arrived, need to invoke receive operation
* to confirm data transfer from the sender to the "recv_message"
* buffer. */
/**
* params.op_attr_mask |= UCP_OP_ATTR_FLAG_NO_IMM_CMPL;
* params.cb.recv_am = am_recv_cb,
* request = ucp_am_recv_data_nbx(ucp_worker,
* am_data_desc.desc,
* msg, msg_length,
* &params);
**/
/* } else { */
/* Data has arrived eagerly and is ready for use, no need to
* initiate receive operation. */
/**
* request = NULL;
* }
* } else {
**/
/* Client sends a message to the server using the AM API */
/**
* params.cb.send = (ucp_send_nbx_callback_t)send_cb,
* request = ucp_am_send_nbx(ep, TEST_AM_ID, NULL, 0ul, msg,
* msg_length, &params);
* }
* return request_finalize(ucp_worker, request, &ctx, is_server, iov,
* current_iter);
*}
**/
/**
* Print this application's usage help message.
*/
Expand Down Expand Up @@ -755,23 +553,14 @@ static int client_server_communication(ucp_worker_h worker, ucp_ep_h ep,
int ret;

switch (send_recv_type) {
/*case CLIENT_SERVER_SEND_RECV_STREAM:*/
/* Client-Server communication via Stream API */
/* ret = send_recv_stream(worker, ep, is_server, current_iter); //----------MODIFIED--------------
break;*/
case CLIENT_SERVER_SEND_RECV_TAG:
/* Client-Server communication via Tag-Matching API */
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
/* case CLIENT_SERVER_SEND_RECV_AM: */
/* Client-Server communication via AM API. */
/* ret = send_recv_am(worker, ep, is_server, current_iter);
break; */
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
}

return ret;
}

Expand Down Expand Up @@ -948,26 +737,6 @@ static int run_server(ucp_context_h ucp_context, ucp_worker_h ucp_worker,
goto err;
}


/* if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) { */
/* Initialize Active Message data handler */
/**
* param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
* UCP_AM_HANDLER_PARAM_FIELD_CB |
* UCP_AM_HANDLER_PARAM_FIELD_ARG;
* param.id = TEST_AM_ID;
* param.cb = ucp_am_data_cb;
**/

/* param.arg = ucp_data_worker; */ /* not used in our callback */
/** status = ucp_worker_set_am_recv_handler(ucp_data_worker,
* &param);
* if (status != UCS_OK) {
* ret = -1;
* goto err_worker;
* }
* }
**/
/* Initialize the server's context. */
context.conn_request = NULL;

Expand Down Expand Up @@ -1068,15 +837,9 @@ static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
/* UCP initialization */
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;

/**if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) { //-------MODIFIED------
* ucp_params.features = UCP_FEATURE_STREAM;
}**/
if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) { /*---------elseif en tag-----*/
ucp_params.features = UCP_FEATURE_TAG;
} /**else {
*ucp_params.features = UCP_FEATURE_AM;
}**/

}
status = ucp_init(&ucp_params, NULL, ucp_context);
if (status != UCS_OK) {
fprintf(stderr, "failed to ucp_init (%s)\n", ucs_status_string(status));
Expand All @@ -1097,7 +860,6 @@ static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
return ret;
}


int main(int argc, char **argv)
{
send_recv_type_t send_recv_type = CLIENT_SERVER_SEND_RECV_DEFAULT;
Expand Down

0 comments on commit 5a315d6

Please sign in to comment.