diff --git a/ucp/_libs/__init__.pxd b/ucp/_libs/__init__.pxd new file mode 100644 index 000000000..6c796d4fe --- /dev/null +++ b/ucp/_libs/__init__.pxd @@ -0,0 +1,4 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# cython: language_level=3 diff --git a/ucp/_libs/topological_distance.pyx b/ucp/_libs/topological_distance.pyx index 429bed547..fa327473a 100644 --- a/ucp/_libs/topological_distance.pyx +++ b/ucp/_libs/topological_distance.pyx @@ -1,9 +1,10 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. + # cython: language_level=3 import pynvml -from topological_distance_dep cimport * +from .topological_distance_dep cimport * cdef class TopologicalDistance: diff --git a/ucp/_libs/ucx_api.pyx b/ucp/_libs/ucx_api.pyx index c8f896774..518052add 100644 --- a/ucp/_libs/ucx_api.pyx +++ b/ucp/_libs/ucx_api.pyx @@ -1,6 +1,8 @@ # Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. + # cython: language_level=3 + import logging import socket import weakref @@ -12,7 +14,9 @@ from libc.stdint cimport uintptr_t from libc.stdio cimport FILE, fclose, fflush from libc.stdlib cimport free from libc.string cimport memset -from ucx_api_dep cimport * + +from .ucx_api_dep cimport * +from .utils cimport get_buffer_data from ..exceptions import ( UCXCanceled, @@ -22,7 +26,6 @@ from ..exceptions import ( log_errors, ) from ..utils import nvtx_annotate -from .utils import get_buffer_data # Struct used as requests by UCX @@ -722,9 +725,7 @@ def tag_send_nb( name: str, optional Descriptive name of the operation """ - cdef void *data = ( - get_buffer_data(buffer, check_writable=False) - ) + cdef void *data = get_buffer_data(buffer, check_writable=False) cdef ucp_send_callback_t _send_cb = _send_callback cdef ucs_status_ptr_t status = ucp_tag_send_nb( ep._handle, @@ -836,9 +837,7 @@ def tag_recv_nb( when the `worker` closes. """ - cdef void *data = ( - get_buffer_data(buffer, check_writable=True) - ) + cdef void *data = get_buffer_data(buffer, check_writable=True) cdef ucp_tag_recv_callback_t _tag_recv_cb = ( _tag_recv_callback ) @@ -904,8 +903,7 @@ def stream_send_nb( name: str, optional Descriptive name of the operation """ - cdef void *data = (get_buffer_data(buffer, - check_writable=False)) + cdef void *data = get_buffer_data(buffer, check_writable=False) cdef ucp_send_callback_t _send_cb = _send_callback cdef ucs_status_ptr_t status = ucp_stream_send_nb( ep._handle, @@ -999,9 +997,7 @@ def stream_recv_nb( Descriptive name of the operation """ - cdef void *data = ( - get_buffer_data(buffer, check_writable=True) - ) + cdef void *data = get_buffer_data(buffer, check_writable=True) cdef size_t length cdef ucp_stream_recv_callback_t _stream_recv_cb = ( _stream_recv_callback diff --git a/ucp/_libs/utils.pxd b/ucp/_libs/utils.pxd new file mode 100644 index 000000000..f2a924420 --- /dev/null +++ b/ucp/_libs/utils.pxd @@ -0,0 +1,11 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# cython: language_level=3 + + +from libc.stdint cimport uintptr_t + + +cpdef uintptr_t get_buffer_data(buffer, bint check_writable=*) except * +cpdef Py_ssize_t get_buffer_nbytes(buffer, check_min_size, bint cuda_support) except * diff --git a/ucp/_libs/utils.pyx b/ucp/_libs/utils.pyx index a6f7f0d34..1ab6f1352 100644 --- a/ucp/_libs/utils.pyx +++ b/ucp/_libs/utils.pyx @@ -1,38 +1,30 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. + # cython: language_level=3 -import asyncio -import operator -from functools import reduce + from cpython.memoryview cimport PyMemoryView_GET_BUFFER +from cython cimport boundscheck, wraparound from libc.stdint cimport uintptr_t -from ..exceptions import UCXCloseError, UCXError - -def get_buffer_data(buffer, check_writable=False): +cpdef uintptr_t get_buffer_data(buffer, bint check_writable=False) except *: """ Returns data pointer of the buffer. Raising ValueError if the buffer is read only and check_writable=True is set. """ - iface = None - if hasattr(buffer, "__cuda_array_interface__"): - iface = buffer.__cuda_array_interface__ - elif hasattr(buffer, "__array_interface__"): - iface = buffer.__array_interface__ + cdef dict iface = getattr(buffer, "__cuda_array_interface__", None) + + cdef uintptr_t data_ptr + cdef bint data_readonly if iface is not None: - data_ptr, data_readonly = iface['data'] + data_ptr, data_readonly = iface["data"] else: mview = memoryview(buffer) - data_ptr = int(PyMemoryView_GET_BUFFER(mview).buf) - data_readonly = mview.readonly - - # Workaround for numba giving None, rather than an 0. - # https://github.com/cupy/cupy/issues/2104 for more info. - if data_ptr is None: - data_ptr = 0 + data_ptr = PyMemoryView_GET_BUFFER(mview).buf + data_readonly = PyMemoryView_GET_BUFFER(mview).readonly if data_ptr == 0: raise NotImplementedError("zero-sized buffers isn't supported") @@ -43,51 +35,61 @@ def get_buffer_data(buffer, check_writable=False): return data_ptr -def get_buffer_nbytes(buffer, check_min_size, cuda_support): +@boundscheck(False) +@wraparound(False) +cpdef Py_ssize_t get_buffer_nbytes(buffer, check_min_size, bint cuda_support) except *: """ Returns the size of the buffer in bytes. Returns ValueError if `check_min_size` is greater than the size of the buffer """ - iface = None - if hasattr(buffer, "__cuda_array_interface__"): - iface = buffer.__cuda_array_interface__ - if not cuda_support: - msg = "UCX is not configured with CUDA support, please add " \ - "`cuda_copy` and/or `cuda_ipc` to the UCX_TLS environment" \ - "variable and that the ucx-proc=*=gpu package is " \ - "installed. See " \ - "https://ucx-py.readthedocs.io/en/latest/install.html for " \ - "more information." - raise ValueError(msg) - elif hasattr(buffer, "__array_interface__"): - iface = buffer.__array_interface__ + cdef dict iface = getattr(buffer, "__cuda_array_interface__", None) + if not cuda_support and iface is not None: + raise ValueError( + "UCX is not configured with CUDA support, please add " + "`cuda_copy` and/or `cuda_ipc` to the UCX_TLS environment" + "variable and that the ucx-proc=*=gpu package is " + "installed. See " + "https://ucx-py.readthedocs.io/en/latest/install.html for " + "more information." + ) + cdef tuple shape, strides + cdef Py_ssize_t i, s, itemsize, ndim, nbytes if iface is not None: import numpy - itemsize = int(numpy.dtype(iface['typestr']).itemsize) + itemsize = numpy.dtype(iface["typestr"]).itemsize # Making sure that the elements in shape is integers - shape = [int(s) for s in iface['shape']] - nbytes = reduce(operator.mul, shape, 1) * itemsize + shape = iface["shape"] + ndim = len(shape) + nbytes = itemsize + for i in range(ndim): + nbytes *= shape[i] # Check that data is contiguous - if len(shape) > 0 and iface.get("strides", None) is not None: - strides = [int(s) for s in iface['strides']] - if len(strides) != len(shape): - msg = "The length of shape and strides must be equal" - raise ValueError(msg) + strides = iface.get("strides") + if strides is not None and ndim > 0: + if len(strides) != ndim: + raise ValueError( + "The length of shape and strides must be equal" + ) s = itemsize - for i in reversed(range(len(shape))): - if s != strides[i]: + for i from ndim > i >= 0 by 1: + if s != strides[i]: raise ValueError("Array must be contiguous") - s *= shape[i] - if iface.get("mask", None) is not None: + s *= shape[i] + if iface.get("mask") is not None: raise NotImplementedError("mask attribute not supported") else: mview = memoryview(buffer) nbytes = mview.nbytes - if not mview.contiguous: - raise ValueError("buffer must be contiguous") + if not mview.c_contiguous: + raise ValueError("buffer must be C-contiguous") - if check_min_size is not None and nbytes < check_min_size: - raise ValueError("the nbytes is greater than the size of the buffer!") + cdef Py_ssize_t min_size + if check_min_size is not None: + min_size = check_min_size + if nbytes < min_size: + raise ValueError( + "the nbytes is greater than the size of the buffer!" + ) return nbytes