Skip to content

Commit

Permalink
Merge pull request openucx#9 from Akshay-Venkatesh/send-recv-obj-ptrs
Browse files Browse the repository at this point in the history
send/recv python objects prototype
  • Loading branch information
Akshay-Venkatesh authored Jan 14, 2019
2 parents 8fc0ad3 + 783cf45 commit 776e726
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 3 deletions.
14 changes: 14 additions & 0 deletions pybind/buffer_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@
#include <cuda.h>
#include <cuda_runtime.h>

struct data_buf *populate_buffer_region(void *src)
{
struct data_buf *db = NULL;
db = (struct data_buf *) malloc(sizeof(struct data_buf));
db->buf = src;
DEBUG_PRINT("allocated %p\n", db->buf);
return db;
}

void *return_ptr_from_buf(struct data_buf *db)
{
return (void *) db->buf;
}

struct data_buf *allocate_host_buffer(int length)
{
struct data_buf *db = NULL;
Expand Down
2 changes: 2 additions & 0 deletions pybind/buffer_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "common.h"

int set_device(int device);
struct data_buf *populate_buffer_region(void *src);
void *return_ptr_from_buf(struct data_buf *db);
struct data_buf *allocate_host_buffer(int length);
struct data_buf *allocate_cuda_buffer(int length);
int set_host_buffer(struct data_buf *db, int c, int length);
Expand Down
26 changes: 24 additions & 2 deletions pybind/ucp_py.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class CommFuture(concurrent.futures.Future):
return self.result_state

def __del__(self):
self.ucp_msg.free_mem()
pass #self.ucp_msg.free_mem()

def __await__(self):
if True == self.done_state:
Expand Down Expand Up @@ -132,6 +132,22 @@ cdef class ucp_py_ep:
msg.ctx_ptr = ucp_py_ep_send_nb(self.ucp_ep, msg.buf, len)
return msg.get_comm_request(len)

def recv_msg(self, msg, len):
buf_reg = buffer_region()
buf_reg.populate_ptr(msg)
buf_reg.is_cuda = 0 # for now but it does not matter
internal_msg = ucp_msg(buf_reg)
internal_msg.ctx_ptr = ucp_py_recv_nb(internal_msg.buf, len)
return internal_msg.get_comm_request(len)

def send_msg(self, msg, len):
buf_reg = buffer_region()
buf_reg.populate_ptr(msg)
buf_reg.is_cuda = 0 # for now but it does not matter
internal_msg = ucp_msg(buf_reg)
internal_msg.ctx_ptr = ucp_py_ep_send_nb(self.ucp_ep, internal_msg.buf, len)
return internal_msg.get_comm_request(len)

def close(self):
return ucp_py_put_ep(self.ucp_ep)

Expand Down Expand Up @@ -221,7 +237,10 @@ cdef class ucp_msg:
self.free_host()

def get_comm_len(self):
return self.comm_len
return self.comm_len

def get_obj(self):
return self.buf_reg.return_obj()

cdef class ucp_comm_request:
cdef ucp_msg msg
Expand Down Expand Up @@ -304,3 +323,6 @@ def destroy_ep(ucp_ep):

def set_cuda_dev(dev):
return set_device(dev)

def get_obj_from_msg(ucp_msg):
return ucp_msg.get_obj()
10 changes: 9 additions & 1 deletion pybind/ucp_py_buffer_helper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

cdef extern from "buffer_ops.h":
int set_device(int)
data_buf* populate_buffer_region(void *)
void* return_ptr_from_buf(data_buf*)
data_buf* allocate_host_buffer(int)
data_buf* allocate_cuda_buffer(int)
int free_host_buffer(data_buf*)
Expand Down Expand Up @@ -31,4 +33,10 @@ cdef class buffer_region:
free_host_buffer(self.buf)

def free_cuda(self):
free_cuda_buffer(self.buf)
free_cuda_buffer(self.buf)

def populate_ptr(self, pyobj):
self.buf = populate_buffer_region(<void *> pyobj)

def return_obj(self):
return <object> return_ptr_from_buf(self.buf)
110 changes: 110 additions & 0 deletions tests/test-send-blind-recv-python-bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

#
# $ # server @ a.b.c.d
# $ python3 tests/test-send-blind-recv-python-bytes.py
# in talk_to_client
# about to send
# about to recv
# <class 'str'>
# server sent: b'[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
# server received: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# talk_to_client done

#
# $ # client @ p.q.r.s
# $ python3 tests/test-send-blind-recv-python-bytes.py -s a.b.c.d -p 13337
# <class 'str'>
# about to send
# client sent: b'[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]'
# client received: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# talk_to_server done
#

import ucp_py as ucp
import time
import argparse
import asyncio
import sys
import concurrent.futures

accept_cb_started = False
new_client_ep = None
max_msg_log = 23

async def talk_to_client(client_ep):

print("in talk_to_client")

print("about to send")
send_msg = bytes(str(list(range(10))), 'utf-8')
send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("about to recv")

recv_req = await client_ep.recv_future()
recv_msg = bytes.decode(ucp.get_obj_from_msg(recv_req))
print(type(recv_msg))

print("server sent: " + str(send_msg))
print("server received: " + str(recv_msg)) # test fails if I send
# list directly
# instead of as a
# string

ucp.destroy_ep(client_ep)
print('talk_to_client done')
ucp.stop_server()

async def talk_to_server(ip, port):

msg_log = max_msg_log

server_ep = ucp.get_endpoint(ip, port)

print("about to recv")

recv_req = await server_ep.recv_future()
recv_msg = bytes.decode(ucp.get_obj_from_msg(recv_req))
print(type(recv_msg))

print("about to send")
send_msg = bytes(str(list(range(10, 20))), 'utf-8')
send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("client sent: " + str(send_msg))
print("client received: " + str(recv_msg)) # test fails if I send
# list directly
# instead of as a
# string

ucp.destroy_ep(server_ep)
print('talk_to_server done')

parser = argparse.ArgumentParser()
parser.add_argument('-s','--server', help='enter server ip', required=False)
parser.add_argument('-p','--port', help='enter server port number', required=False)
args = parser.parse_args()

## initiate ucp
init_str = ""
server = False
if args.server is None:
server = True
else:
server = False
init_str = args.server

ucp.init()
loop = asyncio.get_event_loop()
# coro points to either client or server-side coroutine
if server:
coro = ucp.start_server(talk_to_client, is_coroutine = True)
else:
coro = talk_to_server(init_str.encode(), int(args.port))

loop.run_until_complete(coro)

loop.close()
ucp.fin()
89 changes: 89 additions & 0 deletions tests/test-send-blind-recv-python-obj.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import ucp_py as ucp
import time
import argparse
import asyncio
import sys
import concurrent.futures

accept_cb_started = False
new_client_ep = None
max_msg_log = 23

async def talk_to_client(client_ep):

print("in talk_to_client")

print("about to send")
send_msg = str(list(range(10)))
send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("about to recv")

recv_req = await client_ep.recv_future()
recv_msg = ucp.get_obj_from_msg(recv_req)
print(type(recv_msg))

print("server sent: " + str(send_msg))
print("server received: " + str(recv_msg)) # test fails if I send
# list directly
# instead of as a
# string

ucp.destroy_ep(client_ep)
print('talk_to_client done')
ucp.stop_server()

async def talk_to_server(ip, port):

msg_log = max_msg_log

server_ep = ucp.get_endpoint(ip, port)

print("about to recv")

recv_req = await server_ep.recv_future()
recv_msg = ucp.get_obj_from_msg(recv_req)
print(type(recv_msg))

print("about to send")
send_msg = str(list(range(10, 20)))
send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("client sent: " + str(send_msg))
print("client received: " + str(recv_msg)) # test fails if I send
# list directly
# instead of as a
# string

ucp.destroy_ep(server_ep)
print('talk_to_server done')

parser = argparse.ArgumentParser()
parser.add_argument('-s','--server', help='enter server ip', required=False)
parser.add_argument('-p','--port', help='enter server port number', required=False)
args = parser.parse_args()

## initiate ucp
init_str = ""
server = False
if args.server is None:
server = True
else:
server = False
init_str = args.server

ucp.init()
loop = asyncio.get_event_loop()
# coro points to either client or server-side coroutine
if server:
coro = ucp.start_server(talk_to_client, is_coroutine = True)
else:
coro = talk_to_server(init_str.encode(), int(args.port))

loop.run_until_complete(coro)

loop.close()
ucp.fin()
80 changes: 80 additions & 0 deletions tests/test-send-recv-python-obj.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import ucp_py as ucp
import time
import argparse
import asyncio
import socket
import sys
import concurrent.futures

accept_cb_started = False
new_client_ep = None
max_msg_log = 23

async def talk_to_client(client_ep):

print("in talk_to_client")

print("about to send")
send_msg = "hello from ucx server @" + socket.gethostname()
send_req = await client_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("about to recv")
recv_msg = "hello from ucx server @" + socket.gethostname()
recv_req = await client_ep.recv_msg(recv_msg, sys.getsizeof(recv_msg))

print("server sent: " + send_msg)
print("server received: " + recv_msg)

ucp.destroy_ep(client_ep)
print('talk_to_client done')
ucp.stop_server()

async def talk_to_server(ip, port):

msg_log = max_msg_log

server_ep = ucp.get_endpoint(ip, port)

print("about to recv")
recv_msg = "hello from ucx client @" + socket.gethostname()
recv_req = await server_ep.recv_msg(recv_msg, sys.getsizeof(recv_msg))

print("about to send")
send_msg = "hello from ucx client @" + socket.gethostname()
send_req = await server_ep.send_msg(send_msg, sys.getsizeof(send_msg))

print("client sent: " + send_msg)
print("client received: " + recv_msg)

ucp.destroy_ep(server_ep)
print('talk_to_server done')

parser = argparse.ArgumentParser()
parser.add_argument('-s','--server', help='enter server ip', required=False)
parser.add_argument('-p','--port', help='enter server port number', required=False)
args = parser.parse_args()

## initiate ucp
init_str = ""
server = False
if args.server is None:
server = True
else:
server = False
init_str = args.server

ucp.init()
loop = asyncio.get_event_loop()
# coro points to either client or server-side coroutine
if server:
coro = ucp.start_server(talk_to_client, is_coroutine = True)
else:
coro = talk_to_server(init_str.encode(), int(args.port))

loop.run_until_complete(coro)

loop.close()
ucp.fin()

0 comments on commit 776e726

Please sign in to comment.