diff --git a/build.sh b/build.sh index 5beecdb7af..adbb5851d2 100755 --- a/build.sh +++ b/build.sh @@ -40,7 +40,7 @@ VERBOSE_FLAG="" BUILD_TYPE=Release INSTALL_TARGET=install RAN_CMAKE=0 -PYTHON_ARGS_FOR_INSTALL="--no-build-isolation --no-deps --config-settings rapidsai.disable-cuda=true" +PYTHON_ARGS_FOR_INSTALL="-v --no-build-isolation --no-deps --config-settings rapidsai.disable-cuda=true" # Set defaults for vars that may not have been defined externally diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index 5e2704adfa..758260229d 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -1,18 +1,18 @@ # Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. -from ._lib import libkvikio # type: ignore +from ._lib import buffer, driver_properties # type: ignore from ._version import __git_commit__, __version__ # noqa: F401 from .cufile import CuFile # noqa: F401 def memory_register(buf) -> None: - return libkvikio.memory_register(buf) + return buffer.memory_register(buf) def memory_deregister(buf) -> None: - libkvikio.memory_deregister(buf) + buffer.memory_deregister(buf) # TODO: Wrap nicely, maybe as a dataclass? -DriverProperties = libkvikio.DriverProperties +DriverProperties = driver_properties.DriverProperties diff --git a/python/kvikio/kvikio/_lib/CMakeLists.txt b/python/kvikio/kvikio/_lib/CMakeLists.txt index 04cff87c08..c9b4725674 100644 --- a/python/kvikio/kvikio/_lib/CMakeLists.txt +++ b/python/kvikio/kvikio/_lib/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -12,11 +12,13 @@ # the License. # ============================================================================= -# Set the list of Cython files to build -set(cython_modules arr.pyx libnvcomp.pyx libnvcomp_ll.pyx libkvikio.pyx) +# Set the list of Cython files to build file(GLOB SOURCES "*.pyx") +set(SOURCES arr.pyx libnvcomp.pyx libnvcomp_ll.pyx file_handle.pyx driver_properties.pyx future.pyx + buffer.pyx defaults.pyx +) rapids_cython_create_modules( CXX - SOURCE_FILES "${cython_modules}" + SOURCE_FILES "${SOURCES}" LINKED_LIBRARIES kvikio::kvikio nvcomp::nvcomp ) diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 7b79987c0b..a9d2b7e7f1 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. # distutils: language = c++ @@ -6,6 +6,7 @@ from libc.stdint cimport uintptr_t +from libcpp.utility cimport pair cdef class Array: @@ -25,3 +26,8 @@ cdef class Array: cpdef bint _f_contiguous(self) cpdef bint _contiguous(self) cpdef Py_ssize_t _nbytes(self) + + +cdef pair[uintptr_t, size_t] parse_buffer_argument( + buf, size, bint accept_host_buffer +) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index d8a7b7b5a3..793a414b17 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. # cython: language_level=3 @@ -294,3 +294,24 @@ cpdef asarray(obj): return obj else: return Array(obj) + + +cdef pair[uintptr_t, size_t] parse_buffer_argument( + buf, size, bint accept_host_buffer +) except *: + """Parse `buf` and `size` argument and return a pointer and nbytes""" + if not isinstance(buf, Array): + buf = Array(buf) + cdef Array arr = buf + if not arr._contiguous(): + raise ValueError("Array must be contiguous") + if not accept_host_buffer and not arr.cuda: + raise ValueError("Non-CUDA buffers not supported") + cdef size_t nbytes + if size is None: + nbytes = arr.nbytes + elif size > arr.nbytes: + raise ValueError("Size is greater than the size of the buffer") + else: + nbytes = size + return pair[uintptr_t, size_t](arr.ptr, nbytes) diff --git a/python/kvikio/kvikio/_lib/buffer.pyx b/python/kvikio/kvikio/_lib/buffer.pyx new file mode 100644 index 0000000000..93d8379eef --- /dev/null +++ b/python/kvikio/kvikio/_lib/buffer.pyx @@ -0,0 +1,27 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + + +from kvikio._lib.arr cimport Array + + +cdef extern from "<kvikio/buffer.hpp>" namespace "kvikio" nogil: + void cpp_memory_register "kvikio::memory_register"(const void* devPtr) except + + void cpp_memory_deregister "kvikio::memory_deregister"(const void* devPtr) except + + + +def memory_register(buf) -> None: + if not isinstance(buf, Array): + buf = Array(buf) + cdef Array arr = buf + cpp_memory_register(<void*>arr.ptr) + + +def memory_deregister(buf) -> None: + if not isinstance(buf, Array): + buf = Array(buf) + cdef Array arr = buf + cpp_memory_deregister(<void*>arr.ptr) diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx new file mode 100644 index 0000000000..6ff1cd5997 --- /dev/null +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -0,0 +1,54 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from libcpp cimport bool + + +cdef extern from "<kvikio/defaults.hpp>" nogil: + bool cpp_compat_mode "kvikio::defaults::compat_mode"() except + + void cpp_compat_mode_reset \ + "kvikio::defaults::compat_mode_reset"(bool enable) except + + unsigned int cpp_thread_pool_nthreads \ + "kvikio::defaults::thread_pool_nthreads"() except + + void cpp_thread_pool_nthreads_reset \ + "kvikio::defaults::thread_pool_nthreads_reset" (unsigned int nthreads) except + + size_t cpp_task_size "kvikio::defaults::task_size"() except + + void cpp_task_size_reset "kvikio::defaults::task_size_reset"(size_t nbytes) except + + size_t cpp_gds_threshold "kvikio::defaults::gds_threshold"() except + + void cpp_gds_threshold_reset \ + "kvikio::defaults::gds_threshold_reset"(size_t nbytes) except + + + +def compat_mode() -> bool: + return cpp_compat_mode() + + +def compat_mode_reset(enable: bool) -> None: + cpp_compat_mode_reset(enable) + + +def thread_pool_nthreads() -> int: + return cpp_thread_pool_nthreads() + + +def thread_pool_nthreads_reset(nthreads: int) -> None: + cpp_thread_pool_nthreads_reset(nthreads) + + +def task_size() -> int: + return cpp_task_size() + + +def task_size_reset(nbytes: int) -> None: + cpp_task_size_reset(nbytes) + + +def gds_threshold() -> int: + return cpp_gds_threshold() + + +def gds_threshold_reset(nbytes: int) -> None: + cpp_gds_threshold_reset(nbytes) diff --git a/python/kvikio/kvikio/_lib/driver_properties.pyx b/python/kvikio/kvikio/_lib/driver_properties.pyx new file mode 100644 index 0000000000..674ef14cde --- /dev/null +++ b/python/kvikio/kvikio/_lib/driver_properties.pyx @@ -0,0 +1,85 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + + +from libcpp cimport bool + + +cdef extern from "<kvikio/driver.hpp>" nogil: + cdef cppclass cpp_DriverProperties "kvikio::DriverProperties": + cpp_DriverProperties() except + + bool is_gds_available() except + + unsigned int get_nvfs_major_version() except + + unsigned int get_nvfs_minor_version() except + + bool get_nvfs_allow_compat_mode() except + + bool get_nvfs_poll_mode() except + + size_t get_nvfs_poll_thresh_size() except + + void set_nvfs_poll_mode(bool enable) except + + void set_nvfs_poll_thresh_size(size_t size_in_kb) except + + size_t get_max_device_cache_size() except + + void set_max_device_cache_size(size_t size_in_kb) except + + size_t get_per_buffer_cache_size() except + + size_t get_max_pinned_memory_size() except + + void set_max_pinned_memory_size(size_t size_in_kb) except + + + +cdef class DriverProperties: + cdef cpp_DriverProperties _handle + + @property + def is_gds_available(self) -> bool: + try: + return self._handle.is_gds_available() + except RuntimeError: + return False + + @property + def major_version(self) -> bool: + return self._handle.get_nvfs_major_version() + + @property + def minor_version(self) -> bool: + return self._handle.get_nvfs_minor_version() + + @property + def allow_compat_mode(self) -> bool: + return self._handle.get_nvfs_allow_compat_mode() + + @property + def poll_mode(self) -> bool: + return self._handle.get_nvfs_poll_mode() + + @poll_mode.setter + def poll_mode(self, enable: bool) -> None: + self._handle.set_nvfs_poll_mode(enable) + + @property + def poll_thresh_size(self) -> int: + return self._handle.get_nvfs_poll_thresh_size() + + @poll_thresh_size.setter + def poll_thresh_size(self, size_in_kb: int) -> None: + self._handle.set_nvfs_poll_thresh_size(size_in_kb) + + @property + def max_device_cache_size(self) -> int: + return self._handle.get_max_device_cache_size() + + @max_device_cache_size.setter + def max_device_cache_size(self, size_in_kb: int) -> None: + self._handle.set_max_device_cache_size(size_in_kb) + + @property + def per_buffer_cache_size(self) -> int: + return self._handle.get_per_buffer_cache_size() + + @property + def max_pinned_memory_size(self) -> int: + return self._handle.get_max_pinned_memory_size() + + @max_pinned_memory_size.setter + def max_pinned_memory_size(self, size_in_kb: int) -> None: + self._handle.set_max_pinned_memory_size(size_in_kb) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx new file mode 100644 index 0000000000..4c0bb56505 --- /dev/null +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -0,0 +1,177 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +import pathlib +from typing import Optional + +from posix cimport fcntl + +from libc.stdint cimport uintptr_t +from libcpp cimport bool +from libcpp.string cimport string +from libcpp.utility cimport move, pair + +from kvikio._lib.arr cimport Array, parse_buffer_argument +from kvikio._lib.future cimport ( + IOFuture, + IOFutureStream, + _wrap_io_future, + _wrap_stream_future, + cpp_StreamFuture, + future, +) + +from kvikio._lib import defaults + + +cdef extern from "cuda.h": + ctypedef void* CUstream + + +cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil: + cdef cppclass FileHandle: + FileHandle() except + + FileHandle(int fd) except + + FileHandle( + string file_path, + string flags, + ) except + + FileHandle( + string file_path, + string flags, + fcntl.mode_t mode + ) except + + void close() + bool closed() + int fd() + int fd_open_flags() except + + future[size_t] pread( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + future[size_t] pwrite( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + size_t read( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + size_t write( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + cpp_StreamFuture read_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + cpp_StreamFuture write_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + + +cdef class CuFile: + """ File handle for GPUDirect Storage (GDS) """ + cdef FileHandle _handle + + def __init__(self, file_path, flags="r"): + self._handle = move( + FileHandle( + str.encode(str(pathlib.Path(file_path))), + str.encode(str(flags)) + ) + ) + + def close(self) -> None: + self._handle.close() + + def closed(self) -> bool: + return self._handle.closed() + + def fileno(self) -> int: + return self._handle.fd() + + def open_flags(self) -> int: + return self._handle.fd_open_flags() + + def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return _wrap_io_future( + self._handle.pread( + <void*>info.first, + info.second, + file_offset, + task_size if task_size else defaults.task_size() + ) + ) + + def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return _wrap_io_future( + self._handle.pwrite( + <void*>info.first, + info.second, + file_offset, + task_size if task_size else defaults.task_size() + ) + ) + + def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + return self._handle.read( + <void*>info.first, + info.second, + file_offset, + dev_offset, + ) + + def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + return self._handle.write( + <void*>info.first, + info.second, + file_offset, + dev_offset, + ) + + def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, + st: uintptr_t) -> IOFutureStream: + stream = <CUstream>st + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + return _wrap_stream_future(self._handle.read_async( + <void*>info.first, + info.second, + file_offset, + dev_offset, + stream, + )) + + def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, + st: uintptr_t) -> IOFutureStream: + stream = <CUstream>st + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + return _wrap_stream_future(self._handle.write_async( + <void*>info.first, + info.second, + file_offset, + dev_offset, + stream, + )) diff --git a/python/kvikio/kvikio/_lib/future.pxd b/python/kvikio/kvikio/_lib/future.pxd new file mode 100644 index 0000000000..b55d9018db --- /dev/null +++ b/python/kvikio/kvikio/_lib/future.pxd @@ -0,0 +1,28 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + + +cdef extern from "<future>" namespace "std" nogil: + cdef cppclass future[T]: + future() except + + T get() except + + + +cdef extern from "<kvikio/stream.hpp>" nogil: + cdef cppclass cpp_StreamFuture "kvikio::StreamFuture": + cpp_StreamFuture() except + + size_t check_bytes_done() except + + + +cdef class IOFutureStream: + cdef cpp_StreamFuture _handle + +cdef IOFutureStream _wrap_stream_future(cpp_StreamFuture &fut) + +cdef class IOFuture: + cdef future[size_t] _handle + +cdef IOFuture _wrap_io_future(future[size_t] &fut) diff --git a/python/kvikio/kvikio/_lib/future.pyx b/python/kvikio/kvikio/_lib/future.pyx new file mode 100644 index 0000000000..da6ab308dc --- /dev/null +++ b/python/kvikio/kvikio/_lib/future.pyx @@ -0,0 +1,44 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from libcpp cimport bool +from libcpp.utility cimport move + + +cdef extern from "<kvikio/utils.hpp>" namespace "kvikio" nogil: + bool is_future_done[T](const T& future) except + + + +cdef class IOFutureStream: + """Wrap a C++ StreamFuture in a Python object""" + def check_bytes_done(self) -> int: + return self._handle.check_bytes_done() + + +cdef IOFutureStream _wrap_stream_future(cpp_StreamFuture &fut): + """Wrap a C++ future (of a `size_t`) in a `IOFuture` instance""" + ret = IOFutureStream() + ret._handle = move(fut) + return ret + + +cdef class IOFuture: + """C++ future for CuFile reads and writes""" + def get(self) -> int: + cdef size_t ret + with nogil: + ret = self._handle.get() + return ret + + def done(self) -> bool: + return is_future_done(self._handle) + + +cdef IOFuture _wrap_io_future(future[size_t] &fut): + """Wrap a C++ future (of a `size_t`) in a `IOFuture` instance""" + ret = IOFuture() + ret._handle = move(fut) + return ret diff --git a/python/kvikio/kvikio/_lib/kvikio_cxx_api.pxd b/python/kvikio/kvikio/_lib/kvikio_cxx_api.pxd deleted file mode 100644 index afb644f7e6..0000000000 --- a/python/kvikio/kvikio/_lib/kvikio_cxx_api.pxd +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. -# See file LICENSE for terms. - -# distutils: language = c++ -# cython: language_level=3 - -from posix cimport fcntl - -from libcpp cimport bool -from libcpp.string cimport string -from libcpp.utility cimport pair -from libcpp.vector cimport vector - - -cdef extern from "cuda.h": - ctypedef void* CUstream - - -cdef extern from "<future>" namespace "std" nogil: - cdef cppclass future[T]: - future() except + - T get() except + - - -cdef extern from "<kvikio/stream.hpp>" namespace "kvikio" nogil: - cdef cppclass StreamFuture: - StreamFuture() except + - StreamFuture(StreamFuture&&) except + - size_t check_bytes_done() except + - - -cdef extern from "<kvikio/utils.hpp>" namespace "kvikio" nogil: - bool is_future_done[T](const T& future) except + - - -cdef extern from "<kvikio/driver.hpp>" namespace "kvikio" nogil: - cdef cppclass DriverProperties: - DriverProperties() except + - bool is_gds_available() except + - unsigned int get_nvfs_major_version() except + - unsigned int get_nvfs_minor_version() except + - bool get_nvfs_allow_compat_mode() except + - bool get_nvfs_poll_mode() except + - size_t get_nvfs_poll_thresh_size() except + - void set_nvfs_poll_mode(bool enable) except + - void set_nvfs_poll_thresh_size(size_t size_in_kb) except + - size_t get_max_device_cache_size() except + - void set_max_device_cache_size(size_t size_in_kb) except + - size_t get_per_buffer_cache_size() except + - size_t get_max_pinned_memory_size() except + - void set_max_pinned_memory_size(size_t size_in_kb) except + - - -cdef extern from "<kvikio/buffer.hpp>" namespace "kvikio" nogil: - void memory_register(const void* devPtr) except + - void memory_deregister(const void* devPtr) except + - - -cdef extern from "<kvikio/defaults.hpp>" namespace "kvikio::defaults" nogil: - bool compat_mode() except + - void compat_mode_reset(bool enable) except + - unsigned int thread_pool_nthreads() except + - void thread_pool_nthreads_reset(unsigned int nthreads) except + - size_t task_size() except + - void task_size_reset(size_t nbytes) except + - size_t gds_threshold() except + - void gds_threshold_reset(size_t nbytes) except + - - -cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil: - cdef cppclass FileHandle: - FileHandle() except + - FileHandle(int fd) except + - FileHandle( - string file_path, - string flags, - ) except + - FileHandle( - string file_path, - string flags, - fcntl.mode_t mode - ) except + - void close() - bool closed() - int fd() - int fd_open_flags() except + - future[size_t] pread( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - future[size_t] pwrite( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - size_t read( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - size_t write( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - StreamFuture read_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + - StreamFuture write_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + diff --git a/python/kvikio/kvikio/_lib/libkvikio.pyx b/python/kvikio/kvikio/_lib/libkvikio.pyx deleted file mode 100644 index c9c539877f..0000000000 --- a/python/kvikio/kvikio/_lib/libkvikio.pyx +++ /dev/null @@ -1,264 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. -# See file LICENSE for terms. - -# distutils: language = c++ -# cython: language_level=3 - -import pathlib -from typing import Optional - -from libc.stdint cimport uintptr_t -from libcpp.utility cimport move, pair - -from . cimport kvikio_cxx_api -from .arr cimport Array -from .kvikio_cxx_api cimport CUstream, FileHandle, StreamFuture, future, is_future_done - - -cdef class IOFutureStream: - """Wrap a C++ StreamFuture in a Python object""" - cdef StreamFuture _handle - - def check_bytes_done(self) -> int: - return self._handle.check_bytes_done() - - -cdef IOFutureStream _wrap_stream_future(StreamFuture &fut): - """Wrap a C++ future (of a `size_t`) in a `IOFuture` instance""" - ret = IOFutureStream() - ret._handle = move(fut) - return ret - - -cdef class IOFuture: - """C++ future for CuFile reads and writes""" - cdef future[size_t] _handle - - def get(self) -> int: - cdef size_t ret - with nogil: - ret = self._handle.get() - return ret - - def done(self) -> bool: - return is_future_done(self._handle) - - -cdef IOFuture _wrap_io_future(future[size_t] &fut): - """Wrap a C++ future (of a `size_t`) in a `IOFuture` instance""" - ret = IOFuture() - ret._handle = move(fut) - return ret - - -def memory_register(buf) -> None: - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf - kvikio_cxx_api.memory_register(<void*>arr.ptr) - - -def memory_deregister(buf) -> None: - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf - kvikio_cxx_api.memory_deregister(<void*>arr.ptr) - - -def compat_mode() -> bool: - return kvikio_cxx_api.compat_mode() - - -def compat_mode_reset(enable: bool) -> None: - kvikio_cxx_api.compat_mode_reset(enable) - - -def thread_pool_nthreads() -> int: - return kvikio_cxx_api.thread_pool_nthreads() - - -def thread_pool_nthreads_reset(nthreads: int) -> None: - kvikio_cxx_api.thread_pool_nthreads_reset(nthreads) - - -def task_size() -> int: - return kvikio_cxx_api.task_size() - - -def task_size_reset(nbytes: int) -> None: - kvikio_cxx_api.task_size_reset(nbytes) - - -def gds_threshold() -> int: - return kvikio_cxx_api.gds_threshold() - - -def gds_threshold_reset(nbytes: int) -> None: - kvikio_cxx_api.gds_threshold_reset(nbytes) - - -cdef pair[uintptr_t, size_t] _parse_buffer(buf, size, bint accept_host_buffer) except *: - """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf - if not arr._contiguous(): - raise ValueError("Array must be contiguous") - if not accept_host_buffer and not arr.cuda: - raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: - raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) - - -cdef class CuFile: - """ File handle for GPUDirect Storage (GDS) """ - cdef FileHandle _handle - - def __init__(self, file_path, flags="r"): - self._handle = move( - FileHandle( - str.encode(str(pathlib.Path(file_path))), - str.encode(str(flags)) - ) - ) - - def close(self) -> None: - self._handle.close() - - def closed(self) -> bool: - return self._handle.closed() - - def fileno(self) -> int: - return self._handle.fd() - - def open_flags(self) -> int: - return self._handle.fd_open_flags() - - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, True) - return _wrap_io_future( - self._handle.pread( - <void*>info.first, - info.second, - file_offset, - task_size if task_size else kvikio_cxx_api.task_size() - ) - ) - - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, True) - return _wrap_io_future( - self._handle.pwrite( - <void*>info.first, - info.second, - file_offset, - task_size if task_size else kvikio_cxx_api.task_size() - ) - ) - - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) - return self._handle.read( - <void*>info.first, - info.second, - file_offset, - dev_offset, - ) - - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) - return self._handle.write( - <void*>info.first, - info.second, - file_offset, - dev_offset, - ) - - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = <CUstream>st - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) - return _wrap_stream_future(self._handle.read_async( - <void*>info.first, - info.second, - file_offset, - dev_offset, - stream, - )) - - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = <CUstream>st - cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) - return _wrap_stream_future(self._handle.write_async( - <void*>info.first, - info.second, - file_offset, - dev_offset, - stream, - )) - - -cdef class DriverProperties: - cdef kvikio_cxx_api.DriverProperties _handle - - @property - def is_gds_available(self) -> bool: - try: - return self._handle.is_gds_available() - except RuntimeError: - return False - - @property - def major_version(self) -> bool: - return self._handle.get_nvfs_major_version() - - @property - def minor_version(self) -> bool: - return self._handle.get_nvfs_minor_version() - - @property - def allow_compat_mode(self) -> bool: - return self._handle.get_nvfs_allow_compat_mode() - - @property - def poll_mode(self) -> bool: - return self._handle.get_nvfs_poll_mode() - - @poll_mode.setter - def poll_mode(self, enable: bool) -> None: - self._handle.set_nvfs_poll_mode(enable) - - @property - def poll_thresh_size(self) -> int: - return self._handle.get_nvfs_poll_thresh_size() - - @poll_thresh_size.setter - def poll_thresh_size(self, size_in_kb: int) -> None: - self._handle.set_nvfs_poll_thresh_size(size_in_kb) - - @property - def max_device_cache_size(self) -> int: - return self._handle.get_max_device_cache_size() - - @max_device_cache_size.setter - def max_device_cache_size(self, size_in_kb: int) -> None: - self._handle.set_max_device_cache_size(size_in_kb) - - @property - def per_buffer_cache_size(self) -> int: - return self._handle.get_per_buffer_cache_size() - - @property - def max_pinned_memory_size(self) -> int: - return self._handle.get_max_pinned_memory_size() - - @max_pinned_memory_size.setter - def max_pinned_memory_size(self, size_in_kb: int) -> None: - self._handle.set_max_pinned_memory_size(size_in_kb) diff --git a/python/kvikio/kvikio/cufile.py b/python/kvikio/kvikio/cufile.py index 7bba7bc566..fb33748d51 100644 --- a/python/kvikio/kvikio/cufile.py +++ b/python/kvikio/kvikio/cufile.py @@ -4,7 +4,7 @@ import pathlib from typing import Optional, Union -from ._lib import libkvikio # type: ignore +from ._lib import file_handle # type: ignore class IOFutureStream: @@ -85,7 +85,7 @@ def __init__(self, file: Union[pathlib.Path, str], flags: str = "r"): "a" -> "open for writing, appending to the end of file if it exists" "+" -> "open for updating (reading and writing)" """ - self._handle = libkvikio.CuFile(file, flags) + self._handle = file_handle.CuFile(file, flags) def close(self) -> None: """Deregister the file and close the file""" diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index c300272ef4..ce66cc70f4 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -1,10 +1,10 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import contextlib -from ._lib import libkvikio # type: ignore +import kvikio._lib.defaults def compat_mode() -> bool: @@ -28,7 +28,7 @@ def compat_mode() -> bool: bool Whether KvikIO is running in compatibility mode or not. """ - return libkvikio.compat_mode() + return kvikio._lib.defaults.compat_mode() def compat_mode_reset(enable: bool) -> None: @@ -41,7 +41,7 @@ def compat_mode_reset(enable: bool) -> None: enable : bool Set to True to enable and False to disable compatibility mode """ - libkvikio.compat_mode_reset(enable) + kvikio._lib.defaults.compat_mode_reset(enable) @contextlib.contextmanager @@ -73,7 +73,7 @@ def get_num_threads() -> int: nthreads: int The number of threads in the current thread pool. """ - return libkvikio.thread_pool_nthreads() + return kvikio._lib.defaults.thread_pool_nthreads() def num_threads_reset(nthreads: int) -> None: @@ -92,7 +92,7 @@ def num_threads_reset(nthreads: int) -> None: the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. """ - libkvikio.thread_pool_nthreads_reset(nthreads) + kvikio._lib.defaults.thread_pool_nthreads_reset(nthreads) @contextlib.contextmanager @@ -124,7 +124,7 @@ def task_size() -> int: nbytes: int The default task size in bytes. """ - return libkvikio.task_size() + return kvikio._lib.defaults.task_size() def task_size_reset(nbytes: int) -> None: @@ -135,7 +135,7 @@ def task_size_reset(nbytes: int) -> None: nbytes : int The default task size in bytes. """ - libkvikio.task_size_reset(nbytes) + kvikio._lib.defaults.task_size_reset(nbytes) @contextlib.contextmanager @@ -171,7 +171,7 @@ def gds_threshold() -> int: nbytes : int The default GDS threshold size in bytes. """ - return libkvikio.gds_threshold() + return kvikio._lib.defaults.gds_threshold() def gds_threshold_reset(nbytes: int) -> None: @@ -182,7 +182,7 @@ def gds_threshold_reset(nbytes: int) -> None: nbytes : int The default GDS threshold size in bytes. """ - libkvikio.gds_threshold_reset(nbytes) + kvikio._lib.defaults.gds_threshold_reset(nbytes) @contextlib.contextmanager