From d221db0334c5edbe0b84f329912dd9e467fcf841 Mon Sep 17 00:00:00 2001 From: FNU Akshay Venkatesh Date: Thu, 10 Jan 2019 16:42:37 -0800 Subject: [PATCH 1/5] send/recv python objects prototype --- pybind/buffer_ops.c | 9 ++++ pybind/buffer_ops.h | 1 + pybind/ucp_py.pyx | 16 ++++++ pybind/ucp_py_buffer_helper.pyx | 6 ++- tests/test-send-recv-python-obj.py | 80 ++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 tests/test-send-recv-python-obj.py diff --git a/pybind/buffer_ops.c b/pybind/buffer_ops.c index 1f1d6390a53..b90c07cd129 100644 --- a/pybind/buffer_ops.c +++ b/pybind/buffer_ops.c @@ -9,6 +9,15 @@ #include #include +struct data_buf *populate_buffer_region(void *src) +{ + struct data_buf *db = NULL; + db = (struct data_buf *) malloc(sizeof(struct data_buf)); + db->buf = src; + DEBUG_PRINT("allocated %p\n", db->buf); + return db; +} + struct data_buf *allocate_host_buffer(int length) { struct data_buf *db = NULL; diff --git a/pybind/buffer_ops.h b/pybind/buffer_ops.h index 7316f9362a1..c41b41566f2 100644 --- a/pybind/buffer_ops.h +++ b/pybind/buffer_ops.h @@ -5,6 +5,7 @@ #include "common.h" int set_device(int device); +struct data_buf *populate_buffer_region(void *src); struct data_buf *allocate_host_buffer(int length); struct data_buf *allocate_cuda_buffer(int length); int set_host_buffer(struct data_buf *db, int c, int length); diff --git a/pybind/ucp_py.pyx b/pybind/ucp_py.pyx index d3b8cc0eaec..5b1c0ee7b0b 100644 --- a/pybind/ucp_py.pyx +++ b/pybind/ucp_py.pyx @@ -132,6 +132,22 @@ cdef class ucp_py_ep: msg.ctx_ptr = ucp_py_ep_send_nb(self.ucp_ep, msg.buf, len) return msg.get_comm_request(len) + def recv_msg(self, msg, len): + buf_reg = buffer_region() + buf_reg.populate_ptr(msg) + buf_reg.is_cuda = 0 #for now + internal_msg = ucp_msg(buf_reg) + internal_msg.ctx_ptr = ucp_py_recv_nb(internal_msg.buf, len) + return internal_msg.get_comm_request(len) + + def send_msg(self, msg, len): + buf_reg = buffer_region() + buf_reg.populate_ptr(msg) + buf_reg.is_cuda = 0 #for now + internal_msg = ucp_msg(buf_reg) + internal_msg.ctx_ptr = ucp_py_ep_send_nb(self.ucp_ep, internal_msg.buf, len) + return internal_msg.get_comm_request(len) + def close(self): return ucp_py_put_ep(self.ucp_ep) diff --git a/pybind/ucp_py_buffer_helper.pyx b/pybind/ucp_py_buffer_helper.pyx index 1cc5a6ecbbf..23dc6447b68 100644 --- a/pybind/ucp_py_buffer_helper.pyx +++ b/pybind/ucp_py_buffer_helper.pyx @@ -3,6 +3,7 @@ cdef extern from "buffer_ops.h": int set_device(int) + data_buf* populate_buffer_region(void *) data_buf* allocate_host_buffer(int) data_buf* allocate_cuda_buffer(int) int free_host_buffer(data_buf*) @@ -31,4 +32,7 @@ cdef class buffer_region: free_host_buffer(self.buf) def free_cuda(self): - free_cuda_buffer(self.buf) \ No newline at end of file + free_cuda_buffer(self.buf) + + def populate_ptr(self, pyobj): + self.buf = populate_buffer_region( pyobj) \ No newline at end of file diff --git a/tests/test-send-recv-python-obj.py b/tests/test-send-recv-python-obj.py new file mode 100644 index 00000000000..92c11e887af --- /dev/null +++ b/tests/test-send-recv-python-obj.py @@ -0,0 +1,80 @@ +# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import ucp_py as ucp +import time +import argparse +import asyncio +import socket +import sys +import concurrent.futures + +accept_cb_started = False +new_client_ep = None +max_msg_log = 23 + +async def talk_to_client(client_ep): + + print("in talk_to_client") + + print("about to send") + send_msg = "hello from ucx server @" + socket.gethostname() + send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("about to recv") + recv_msg = "hello from ucx server @" + socket.gethostname() + recv_req = await client_ep.recv_msg(recv_msg, sys.getsizeof(recv_msg)) + + print("server sent: " + send_msg) + print("server received: " + recv_msg) + + ucp.destroy_ep(client_ep) + print('talk_to_client done') + ucp.stop_server() + +async def talk_to_server(ip, port): + + msg_log = max_msg_log + + server_ep = ucp.get_endpoint(ip, port) + + print("about to recv") + recv_msg = "hello from ucx client @" + socket.gethostname() + recv_req = await server_ep.recv_msg(recv_msg, sys.getsizeof(recv_msg)) + + print("about to send") + send_msg = "hello from ucx client @" + socket.gethostname() + send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("client sent: " + send_msg) + print("client received: " + recv_msg) + + ucp.destroy_ep(server_ep) + print('talk_to_server done') + +parser = argparse.ArgumentParser() +parser.add_argument('-s','--server', help='enter server ip', required=False) +parser.add_argument('-p','--port', help='enter server port number', required=False) +args = parser.parse_args() + +## initiate ucp +init_str = "" +server = False +if args.server is None: + server = True +else: + server = False + init_str = args.server + +ucp.init() +loop = asyncio.get_event_loop() +# coro points to either client or server-side coroutine +if server: + coro = ucp.start_server(talk_to_client, is_coroutine = True) +else: + coro = talk_to_server(init_str.encode(), int(args.port)) + +loop.run_until_complete(coro) + +loop.close() +ucp.fin() From 79defcf18ff9fbcfcb6dd6428d6f54eb3e7cc31b Mon Sep 17 00:00:00 2001 From: FNU Akshay Venkatesh Date: Thu, 10 Jan 2019 19:02:37 -0800 Subject: [PATCH 2/5] blind recv and convert to python object prototype; free missing -- needs fix --- pybind/buffer_ops.c | 8 ++++++++ pybind/buffer_ops.h | 1 + pybind/ucp_py.pyx | 14 ++++++++++---- pybind/ucp_py_buffer_helper.pyx | 6 +++++- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pybind/buffer_ops.c b/pybind/buffer_ops.c index b90c07cd129..7db507bfa82 100644 --- a/pybind/buffer_ops.c +++ b/pybind/buffer_ops.c @@ -18,11 +18,18 @@ struct data_buf *populate_buffer_region(void *src) return db; } +void *return_ptr_from_buf(struct data_buf *db) +{ + printf("db %p db->buf %p\n", db, db->buf); + return (void *) db->buf; +} + struct data_buf *allocate_host_buffer(int length) { struct data_buf *db = NULL; db = (struct data_buf *) malloc(sizeof(struct data_buf)); db->buf = (void *) malloc(length); + printf("AH: db %p db->buf %p\n", db, db->buf); DEBUG_PRINT("allocated %p\n", db->buf); return db; } @@ -89,6 +96,7 @@ int free_host_buffer(struct data_buf *db) { free(db->buf); free(db); + printf("FH:db %p db->buf %p\n", db, db->buf); return 0; } diff --git a/pybind/buffer_ops.h b/pybind/buffer_ops.h index c41b41566f2..4953e23e5cf 100644 --- a/pybind/buffer_ops.h +++ b/pybind/buffer_ops.h @@ -6,6 +6,7 @@ int set_device(int device); struct data_buf *populate_buffer_region(void *src); +void *return_ptr_from_buf(struct data_buf *db); struct data_buf *allocate_host_buffer(int length); struct data_buf *allocate_cuda_buffer(int length); int set_host_buffer(struct data_buf *db, int c, int length); diff --git a/pybind/ucp_py.pyx b/pybind/ucp_py.pyx index 5b1c0ee7b0b..001778b5d6d 100644 --- a/pybind/ucp_py.pyx +++ b/pybind/ucp_py.pyx @@ -53,7 +53,7 @@ class CommFuture(concurrent.futures.Future): return self.result_state def __del__(self): - self.ucp_msg.free_mem() + pass #self.ucp_msg.free_mem() def __await__(self): if True == self.done_state: @@ -135,7 +135,7 @@ cdef class ucp_py_ep: def recv_msg(self, msg, len): buf_reg = buffer_region() buf_reg.populate_ptr(msg) - buf_reg.is_cuda = 0 #for now + buf_reg.is_cuda = 0 # for now but it does not matter internal_msg = ucp_msg(buf_reg) internal_msg.ctx_ptr = ucp_py_recv_nb(internal_msg.buf, len) return internal_msg.get_comm_request(len) @@ -143,7 +143,7 @@ cdef class ucp_py_ep: def send_msg(self, msg, len): buf_reg = buffer_region() buf_reg.populate_ptr(msg) - buf_reg.is_cuda = 0 #for now + buf_reg.is_cuda = 0 # for now but it does not matter internal_msg = ucp_msg(buf_reg) internal_msg.ctx_ptr = ucp_py_ep_send_nb(self.ucp_ep, internal_msg.buf, len) return internal_msg.get_comm_request(len) @@ -237,7 +237,10 @@ cdef class ucp_msg: self.free_host() def get_comm_len(self): - return self.comm_len + return self.comm_len + + def get_obj(self): + return self.buf_reg.return_obj() cdef class ucp_comm_request: cdef ucp_msg msg @@ -320,3 +323,6 @@ def destroy_ep(ucp_ep): def set_cuda_dev(dev): return set_device(dev) + +def get_obj_from_msg(ucp_msg): + return ucp_msg.get_obj() \ No newline at end of file diff --git a/pybind/ucp_py_buffer_helper.pyx b/pybind/ucp_py_buffer_helper.pyx index 23dc6447b68..76726dab282 100644 --- a/pybind/ucp_py_buffer_helper.pyx +++ b/pybind/ucp_py_buffer_helper.pyx @@ -4,6 +4,7 @@ cdef extern from "buffer_ops.h": int set_device(int) data_buf* populate_buffer_region(void *) + void* return_ptr_from_buf(data_buf*) data_buf* allocate_host_buffer(int) data_buf* allocate_cuda_buffer(int) int free_host_buffer(data_buf*) @@ -35,4 +36,7 @@ cdef class buffer_region: free_cuda_buffer(self.buf) def populate_ptr(self, pyobj): - self.buf = populate_buffer_region( pyobj) \ No newline at end of file + self.buf = populate_buffer_region( pyobj) + + def return_obj(self): + return return_ptr_from_buf(self.buf) From 82d409439c878dc377e9fea9236825d0b2dbb210 Mon Sep 17 00:00:00 2001 From: FNU Akshay Venkatesh Date: Thu, 10 Jan 2019 19:03:16 -0800 Subject: [PATCH 3/5] blind recv test --- tests/test-send-blind-recv-python-obj.py | 84 ++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 tests/test-send-blind-recv-python-obj.py diff --git a/tests/test-send-blind-recv-python-obj.py b/tests/test-send-blind-recv-python-obj.py new file mode 100644 index 00000000000..bb70e2ef506 --- /dev/null +++ b/tests/test-send-blind-recv-python-obj.py @@ -0,0 +1,84 @@ +# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import ucp_py as ucp +import time +import argparse +import asyncio +import socket +import sys +import concurrent.futures + +accept_cb_started = False +new_client_ep = None +max_msg_log = 23 + +async def talk_to_client(client_ep): + + print("in talk_to_client") + + print("about to send") + send_msg = "hello from ucx server @" + socket.gethostname() + send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("about to recv") + + recv_req = await client_ep.recv_future() + #ucp.get_obj_from_msg(recv_req) + recv_msg = ucp.get_obj_from_msg(recv_req) + + print("server sent: " + send_msg) + print("server received: " + recv_msg) + + ucp.destroy_ep(client_ep) + print('talk_to_client done') + ucp.stop_server() + +async def talk_to_server(ip, port): + + msg_log = max_msg_log + + server_ep = ucp.get_endpoint(ip, port) + + print("about to recv") + + recv_req = await server_ep.recv_future() + #ucp.get_obj_from_msg(recv_req) + recv_msg = ucp.get_obj_from_msg(recv_req) + + print("about to send") + send_msg = "hello from ucx client @" + socket.gethostname() + send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("client sent: " + send_msg) + print("client received: " + recv_msg) + + ucp.destroy_ep(server_ep) + print('talk_to_server done') + +parser = argparse.ArgumentParser() +parser.add_argument('-s','--server', help='enter server ip', required=False) +parser.add_argument('-p','--port', help='enter server port number', required=False) +args = parser.parse_args() + +## initiate ucp +init_str = "" +server = False +if args.server is None: + server = True +else: + server = False + init_str = args.server + +ucp.init() +loop = asyncio.get_event_loop() +# coro points to either client or server-side coroutine +if server: + coro = ucp.start_server(talk_to_client, is_coroutine = True) +else: + coro = talk_to_server(init_str.encode(), int(args.port)) + +loop.run_until_complete(coro) + +loop.close() +ucp.fin() From e021cd9e95745d613857a87c9a5b31d5a5314a8b Mon Sep 17 00:00:00 2001 From: FNU Akshay Venkatesh Date: Fri, 11 Jan 2019 10:14:14 -0800 Subject: [PATCH 4/5] removing prints --- pybind/buffer_ops.c | 3 --- tests/test-send-blind-recv-python-obj.py | 19 ++++++++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pybind/buffer_ops.c b/pybind/buffer_ops.c index 7db507bfa82..1cca32bdab0 100644 --- a/pybind/buffer_ops.c +++ b/pybind/buffer_ops.c @@ -20,7 +20,6 @@ struct data_buf *populate_buffer_region(void *src) void *return_ptr_from_buf(struct data_buf *db) { - printf("db %p db->buf %p\n", db, db->buf); return (void *) db->buf; } @@ -29,7 +28,6 @@ struct data_buf *allocate_host_buffer(int length) struct data_buf *db = NULL; db = (struct data_buf *) malloc(sizeof(struct data_buf)); db->buf = (void *) malloc(length); - printf("AH: db %p db->buf %p\n", db, db->buf); DEBUG_PRINT("allocated %p\n", db->buf); return db; } @@ -96,7 +94,6 @@ int free_host_buffer(struct data_buf *db) { free(db->buf); free(db); - printf("FH:db %p db->buf %p\n", db, db->buf); return 0; } diff --git a/tests/test-send-blind-recv-python-obj.py b/tests/test-send-blind-recv-python-obj.py index bb70e2ef506..59e915519ca 100644 --- a/tests/test-send-blind-recv-python-obj.py +++ b/tests/test-send-blind-recv-python-obj.py @@ -5,7 +5,6 @@ import time import argparse import asyncio -import socket import sys import concurrent.futures @@ -18,7 +17,7 @@ async def talk_to_client(client_ep): print("in talk_to_client") print("about to send") - send_msg = "hello from ucx server @" + socket.gethostname() + send_msg = str(list(range(10))) send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg)) print("about to recv") @@ -27,8 +26,11 @@ async def talk_to_client(client_ep): #ucp.get_obj_from_msg(recv_req) recv_msg = ucp.get_obj_from_msg(recv_req) - print("server sent: " + send_msg) - print("server received: " + recv_msg) + print("server sent: " + str(send_msg)) + print("server received: " + str(recv_msg)) # test fails if I send + # list directly + # instead of as a + # string ucp.destroy_ep(client_ep) print('talk_to_client done') @@ -47,11 +49,14 @@ async def talk_to_server(ip, port): recv_msg = ucp.get_obj_from_msg(recv_req) print("about to send") - send_msg = "hello from ucx client @" + socket.gethostname() + send_msg = str(list(range(10, 20))) send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg)) - print("client sent: " + send_msg) - print("client received: " + recv_msg) + print("client sent: " + str(send_msg)) + print("client received: " + str(recv_msg)) # test fails if I send + # list directly + # instead of as a + # string ucp.destroy_ep(server_ep) print('talk_to_server done') From 783cf45c58e631b3d998201fe8f1484180c2acd4 Mon Sep 17 00:00:00 2001 From: FNU Akshay Venkatesh Date: Mon, 14 Jan 2019 11:01:27 -0800 Subject: [PATCH 5/5] adding test to check transfer of bytes object --- tests/test-send-blind-recv-python-bytes.py | 110 +++++++++++++++++++++ tests/test-send-blind-recv-python-obj.py | 4 +- 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 tests/test-send-blind-recv-python-bytes.py diff --git a/tests/test-send-blind-recv-python-bytes.py b/tests/test-send-blind-recv-python-bytes.py new file mode 100644 index 00000000000..d88ac8993b8 --- /dev/null +++ b/tests/test-send-blind-recv-python-bytes.py @@ -0,0 +1,110 @@ +# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# +# $ # server @ a.b.c.d +# $ python3 tests/test-send-blind-recv-python-bytes.py +# in talk_to_client +# about to send +# about to recv +# +# server sent: b'[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]' +# server received: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] +# talk_to_client done + +# +# $ # client @ p.q.r.s +# $ python3 tests/test-send-blind-recv-python-bytes.py -s a.b.c.d -p 13337 +# +# about to send +# client sent: b'[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]' +# client received: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] +# talk_to_server done +# + +import ucp_py as ucp +import time +import argparse +import asyncio +import sys +import concurrent.futures + +accept_cb_started = False +new_client_ep = None +max_msg_log = 23 + +async def talk_to_client(client_ep): + + print("in talk_to_client") + + print("about to send") + send_msg = bytes(str(list(range(10))), 'utf-8') + send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("about to recv") + + recv_req = await client_ep.recv_future() + recv_msg = bytes.decode(ucp.get_obj_from_msg(recv_req)) + print(type(recv_msg)) + + print("server sent: " + str(send_msg)) + print("server received: " + str(recv_msg)) # test fails if I send + # list directly + # instead of as a + # string + + ucp.destroy_ep(client_ep) + print('talk_to_client done') + ucp.stop_server() + +async def talk_to_server(ip, port): + + msg_log = max_msg_log + + server_ep = ucp.get_endpoint(ip, port) + + print("about to recv") + + recv_req = await server_ep.recv_future() + recv_msg = bytes.decode(ucp.get_obj_from_msg(recv_req)) + print(type(recv_msg)) + + print("about to send") + send_msg = bytes(str(list(range(10, 20))), 'utf-8') + send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg)) + + print("client sent: " + str(send_msg)) + print("client received: " + str(recv_msg)) # test fails if I send + # list directly + # instead of as a + # string + + ucp.destroy_ep(server_ep) + print('talk_to_server done') + +parser = argparse.ArgumentParser() +parser.add_argument('-s','--server', help='enter server ip', required=False) +parser.add_argument('-p','--port', help='enter server port number', required=False) +args = parser.parse_args() + +## initiate ucp +init_str = "" +server = False +if args.server is None: + server = True +else: + server = False + init_str = args.server + +ucp.init() +loop = asyncio.get_event_loop() +# coro points to either client or server-side coroutine +if server: + coro = ucp.start_server(talk_to_client, is_coroutine = True) +else: + coro = talk_to_server(init_str.encode(), int(args.port)) + +loop.run_until_complete(coro) + +loop.close() +ucp.fin() diff --git a/tests/test-send-blind-recv-python-obj.py b/tests/test-send-blind-recv-python-obj.py index 59e915519ca..93c5bad8a62 100644 --- a/tests/test-send-blind-recv-python-obj.py +++ b/tests/test-send-blind-recv-python-obj.py @@ -23,8 +23,8 @@ async def talk_to_client(client_ep): print("about to recv") recv_req = await client_ep.recv_future() - #ucp.get_obj_from_msg(recv_req) recv_msg = ucp.get_obj_from_msg(recv_req) + print(type(recv_msg)) print("server sent: " + str(send_msg)) print("server received: " + str(recv_msg)) # test fails if I send @@ -45,8 +45,8 @@ async def talk_to_server(ip, port): print("about to recv") recv_req = await server_ep.recv_future() - #ucp.get_obj_from_msg(recv_req) recv_msg = ucp.get_obj_from_msg(recv_req) + print(type(recv_msg)) print("about to send") send_msg = str(list(range(10, 20)))