diff --git a/.gitignore b/.gitignore index b8c4a39f8a..5ebc04bb31 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ cufile.log docs/build/ cpp/doxygen/html/ .mypy_cache +.hypothesis diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index bae02fcf88..e515297408 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,7 @@ class defaults { kvikio::third_party::thread_pool _thread_pool{get_num_threads_from_env()}; bool _compat_mode; std::size_t _task_size; + std::size_t _gds_threshold; static unsigned int get_num_threads_from_env() { @@ -109,6 +110,14 @@ class defaults { } _task_size = env; } + // Determine the default value of `gds_threshold` + { + const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024); + if (env <= 0) { + throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer"); + } + _gds_threshold = env; + } } static defaults* instance() @@ -202,6 +211,25 @@ class defaults { * @param nbytes The default task size in bytes. */ static void task_size_reset(std::size_t nbytes) { instance()->_task_size = nbytes; } + + /** + * @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes). + * + * In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut + * that circumvent the threadpool and use the POSIX backend directly. + * + * Set the default value using `kvikio::default::task_size_reset()` or by setting the + * `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 1 MiB. + * + * @return The default GDS threshold size in bytes. + */ + [[nodiscard]] static std::size_t gds_threshold() { return instance()->_gds_threshold; } + + /** + * @brief Reset the default GDS threshold, which is the minimum size to use GDS (in bytes). + * @param nbytes The default GDS threshold size in bytes. + */ + static void gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } }; } // namespace kvikio diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index 716dcfc66c..5018af2328 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -364,6 +364,9 @@ class FileHandle { * This API is a parallel async version of `.read()` that partition the operation * into tasks of size `task_size` for execution in the default thread pool. * + * In order to improve performance of small buffers, when `size < gds_threshold` a shortcut + * that circumvent the threadpool and use the POSIX backend directly is used. + * * @note For cuFile reads, the base address of the allocation `buf` is part of is used. * This means that when registering buffers, use the base address of the allocation. * This is what `memory_register` and `memory_deregister` do automatically. @@ -372,12 +375,14 @@ class FileHandle { * @param size Size in bytes to read. * @param file_offset Offset in the file to read from. * @param task_size Size of each task in bytes. + * @param gds_threshold Minimum buffer size to use GDS and the thread pool. * @return Future that on completion returns the size of bytes that were successfully read. */ std::future pread(void* buf, std::size_t size, - std::size_t file_offset = 0, - std::size_t task_size = defaults::task_size()) + std::size_t file_offset = 0, + std::size_t task_size = defaults::task_size(), + std::size_t gds_threshold = defaults::gds_threshold()) { if (is_host_memory(buf)) { auto op = [this](void* hostPtr_base, @@ -392,14 +397,24 @@ class FileHandle { } CUcontext ctx = get_context_from_pointer(buf); - auto task = [this, ctx](void* devPtr_base, + + // Shortcut that circumvent the threadpool and use the POSIX backend directly. + if (size < gds_threshold) { + auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { + PushAndPopContext c(ctx); + return posix_device_read(_fd_direct_off, buf, size, file_offset, 0); + }; + return std::async(std::launch::deferred, task); + } + + // Regular case that use the threadpool and run the tasks in parallel + auto task = [this, ctx](void* devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset) -> std::size_t { PushAndPopContext c(ctx); return read(devPtr_base, size, file_offset, devPtr_offset); }; - auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx); return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset); } @@ -410,6 +425,9 @@ class FileHandle { * This API is a parallel async version of `.write()` that partition the operation * into tasks of size `task_size` for execution in the default thread pool. * + * In order to improve performance of small buffers, when `size < gds_threshold` a shortcut + * that circumvent the threadpool and use the POSIX backend directly is used. + * * @note For cuFile reads, the base address of the allocation `buf` is part of is used. * This means that when registering buffers, use the base address of the allocation. * This is what `memory_register` and `memory_deregister` do automatically. @@ -418,12 +436,14 @@ class FileHandle { * @param size Size in bytes to write. * @param file_offset Offset in the file to write from. * @param task_size Size of each task in bytes. + * @param gds_threshold Minimum buffer size to use GDS and the thread pool. * @return Future that on completion returns the size of bytes that were successfully written. */ std::future pwrite(const void* buf, std::size_t size, - std::size_t file_offset = 0, - std::size_t task_size = defaults::task_size()) + std::size_t file_offset = 0, + std::size_t task_size = defaults::task_size(), + std::size_t gds_threshold = defaults::gds_threshold()) { if (is_host_memory(buf)) { auto op = [this](const void* hostPtr_base, @@ -438,7 +458,18 @@ class FileHandle { } CUcontext ctx = get_context_from_pointer(buf); - auto op = [this, ctx](const void* devPtr_base, + + // Shortcut that circumvent the threadpool and use the POSIX backend directly. + if (size < gds_threshold) { + auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { + PushAndPopContext c(ctx); + return posix_device_write(_fd_direct_off, buf, size, file_offset, 0); + }; + return std::async(std::launch::deferred, task); + } + + // Regular case that use the threadpool and run the tasks in parallel + auto op = [this, ctx](const void* devPtr_base, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset) -> std::size_t { diff --git a/python/kvikio/_lib/kvikio_cxx_api.pxd b/python/kvikio/_lib/kvikio_cxx_api.pxd index c9dbfcdca2..83c0a934bb 100644 --- a/python/kvikio/_lib/kvikio_cxx_api.pxd +++ b/python/kvikio/_lib/kvikio_cxx_api.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. # distutils: language = c++ @@ -52,6 +52,9 @@ cdef extern from "" namespace "kvikio::defaults" nogil: void thread_pool_nthreads_reset(unsigned int nthreads) except + size_t task_size() except + void task_size_reset(size_t nbytes) except + + size_t gds_threshold() except + + void gds_threshold_reset(size_t nbytes) except + + cdef extern from "" namespace "kvikio" nogil: cdef cppclass FileHandle: diff --git a/python/kvikio/_lib/libkvikio.pyx b/python/kvikio/_lib/libkvikio.pyx index f8f4d5b7c5..5be21bf6e9 100644 --- a/python/kvikio/_lib/libkvikio.pyx +++ b/python/kvikio/_lib/libkvikio.pyx @@ -69,8 +69,16 @@ def task_size() -> int: return kvikio_cxx_api.task_size() -def task_size_reset(nthreads: int) -> None: - kvikio_cxx_api.task_size_reset(nthreads) +def task_size_reset(nbytes: int) -> None: + kvikio_cxx_api.task_size_reset(nbytes) + + +def gds_threshold() -> int: + return kvikio_cxx_api.gds_threshold() + + +def gds_threshold_reset(nbytes: int) -> None: + kvikio_cxx_api.gds_threshold_reset(nbytes) cdef pair[uintptr_t, size_t] _parse_buffer(buf, size, bint accept_host_buffer) except *: diff --git a/python/kvikio/defaults.py b/python/kvikio/defaults.py index 7e08ebd21b..141ab2ba89 100644 --- a/python/kvikio/defaults.py +++ b/python/kvikio/defaults.py @@ -119,8 +119,8 @@ def task_size() -> int: Return ------ - nthreads: int - The number of threads in the current thread pool. + nbytes: int + The default task size in bytes. """ return libkvikio.task_size() @@ -131,7 +131,7 @@ def task_size_reset(nbytes: int) -> None: Parameters ---------- nbytes : int - The number of threads to use. + The default task size in bytes. """ libkvikio.task_size_reset(nbytes) @@ -143,7 +143,7 @@ def set_task_size(nbytes: int): Parameters ---------- nbytes : int - The number of threads to use. + The default task size in bytes. """ old_value = task_size() try: @@ -151,3 +151,50 @@ def set_task_size(nbytes: int): yield finally: task_size_reset(old_value) + + +def gds_threshold() -> int: + """Get the default GDS threshold, which is the minimum size to use GDS. + + In order to improve performance of small IO, `.pread()` and `.pwrite()` + implements a shortcut that circumvent the threadpool and use the POSIX + backend directly. + + Set the default value using `gds_threshold_reset()` or by setting the + `KVIKIO_TASK_SIZE` environment variable. If not set, the default value + is 1 MiB. + + Return + ------ + nbytes : int + The default GDS threshold size in bytes. + """ + return libkvikio.gds_threshold() + + +def gds_threshold_reset(nbytes: int) -> None: + """Reset the default GDS threshold, which is the minimum size to use GDS. + + Parameters + ---------- + nbytes : int + The default GDS threshold size in bytes. + """ + libkvikio.gds_threshold_reset(nbytes) + + +@contextlib.contextmanager +def set_gds_threshold(nbytes: int): + """Context for resetting the default GDS threshold. + + Parameters + ---------- + nbytes : int + The default GDS threshold size in bytes. + """ + old_value = gds_threshold() + try: + gds_threshold_reset(nbytes) + yield + finally: + gds_threshold_reset(old_value) diff --git a/python/tests/conftest.py b/python/tests/conftest.py index afd11af013..c1cc77026e 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -8,6 +8,8 @@ import pytest +import kvikio.defaults + mp = mp.get_context("spawn") # type: ignore @@ -83,3 +85,14 @@ def xp(request): with ctx: yield pytest.importorskip(module_name) + + +@pytest.fixture( + params=[0, 2**20], + ids=["gds_threshold=0MB", "gds_threshold=1MB"], +) +def gds_threshold(request): + """Fixture to parametrize over GDS threshold values""" + + with kvikio.defaults.set_gds_threshold(request.param): + yield request.param diff --git a/python/tests/test_basic_io.py b/python/tests/test_basic_io.py index c12bf395e4..28872e4f70 100644 --- a/python/tests/test_basic_io.py +++ b/python/tests/test_basic_io.py @@ -22,7 +22,7 @@ def check_bit_flags(x: int, y: int) -> bool: @pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) @pytest.mark.parametrize("nthreads", [1, 3, 4, 16]) @pytest.mark.parametrize("tasksize", [199, 1024]) -def test_read_write(tmp_path, xp, size, nthreads, tasksize): +def test_read_write(tmp_path, xp, gds_threshold, size, nthreads, tasksize): """Test basic read/write""" filename = tmp_path / "test-file" @@ -81,7 +81,7 @@ def test_set_compat_mode_between_io(tmp_path): assert f.write(a) == a.nbytes -def test_write_to_files_in_chunks(tmp_path, xp): +def test_write_to_files_in_chunks(tmp_path, xp, gds_threshold): """Write to files in chunks""" filename = tmp_path / "test-file" @@ -118,7 +118,7 @@ def test_write_to_files_in_chunks(tmp_path, xp): "start,end", [(0, 10 * 4096), (1, int(1.3 * 4096)), (int(2.1 * 4096), int(5.6 * 4096))], ) -def test_read_write_slices(tmp_path, xp, nthreads, tasksize, start, end): +def test_read_write_slices(tmp_path, xp, gds_threshold, nthreads, tasksize, start, end): """Read and write different slices""" with kvikio.defaults.set_num_threads(nthreads): @@ -178,7 +178,7 @@ def with_no_cuda_context(): assert err == cuda.CUresult.CUDA_SUCCESS -def test_no_current_cuda_context(tmp_path, xp): +def test_no_current_cuda_context(tmp_path, xp, gds_threshold): """Test IO when CUDA context is current""" filename = tmp_path / "test-file" a = xp.arange(100) @@ -194,7 +194,7 @@ def test_no_current_cuda_context(tmp_path, xp): @pytest.mark.skipif( cupy.cuda.runtime.getDeviceCount() < 2, reason="requires multiple GPUs" ) -def test_multiple_gpus(tmp_path, xp): +def test_multiple_gpus(tmp_path, xp, gds_threshold): """Test IO from two different GPUs""" filename = tmp_path / "test-file"