Skip to content

Commit

Permalink
Optimize small reads and writes (#190)
Browse files Browse the repository at this point in the history
Fixes #178

Adding a GDS threshold option, which is the minimum size to use GDS. In order to improve performance of small IO, `.pread()` and `.pwrite()` implements a shortcut that circumvent the threadpool and use the POSIX backend directly.

This should remove the final performance regression of the KvikIO backend observed in rapidsai/cudf#12841





<details>
<summary>cuDF ORC WRITE performance on a DXG-1</summary>

These details _remain_ **hidden** until expanded.

### `LIBCUDF_CUFILE_POLICY=OFF`
```
CUDF_BENCHMARK_DROP_CACHE=1 LIBCUDF_CUFILE_POLICY=OFF ./ORC_WRITER_NVBENCH --devices 7 --benchmark orc_write_io_compression
|     io      | compression | cardinality | run_length | Samples |  CPU Time  | Noise  |  GPU Time  | Noise  | bytes_per_second | peak_memory_usage | encoded_file_size |
|-------------|-------------|-------------|------------|---------|------------|--------|------------|--------|------------------|-------------------|-------------------|
|    FILEPATH |      SNAPPY |           0 |          1 |     13x |    1.176 s | 12.60% |    1.176 s | 12.60% |        456457427 |         1.670 GiB |       486.275 MiB |
|    FILEPATH |      SNAPPY |        1000 |          1 |     13x |    1.176 s | 19.83% |    1.176 s | 19.83% |        456525931 |         1.679 GiB |       354.557 MiB |
|    FILEPATH |      SNAPPY |           0 |         32 |     29x | 506.960 ms |  4.55% | 506.955 ms |  4.55% |       1059011363 |         1.197 GiB |        41.990 MiB |
|    FILEPATH |      SNAPPY |        1000 |         32 |     30x | 499.540 ms |  1.22% | 499.535 ms |  1.22% |       1074740259 |         1.206 GiB |        23.796 MiB |
|    FILEPATH |        NONE |           0 |          1 |     14x | 985.967 ms |  8.75% | 985.965 ms |  8.75% |        544512912 |       690.244 MiB |       489.816 MiB |
|    FILEPATH |        NONE |        1000 |          1 |     13x |    1.049 s | 15.51% |    1.049 s | 15.51% |        511739947 |       684.575 MiB |       483.465 MiB |
|    FILEPATH |        NONE |           0 |         32 |     30x | 494.754 ms |  1.76% | 494.749 ms |  1.76% |       1085137163 |       690.236 MiB |        57.157 MiB |
|    FILEPATH |        NONE |        1000 |         32 |     31x | 487.722 ms |  1.19% | 487.717 ms |  1.19% |       1100783818 |       683.858 MiB |        49.200 MiB |
| HOST_BUFFER |      SNAPPY |           0 |          1 |      6x |    1.300 s |  0.50% |    1.300 s |  0.50% |        412835052 |         1.670 GiB |       486.275 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |          1 |      5x |    1.137 s |  0.41% |    1.137 s |  0.41% |        472025812 |         1.679 GiB |       354.557 MiB |
| HOST_BUFFER |      SNAPPY |           0 |         32 |     32x | 481.990 ms |  1.39% | 481.984 ms |  1.39% |       1113876547 |         1.197 GiB |        41.990 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |         32 |     32x | 475.133 ms |  1.41% | 475.127 ms |  1.41% |       1129952705 |         1.206 GiB |        23.796 MiB |
| HOST_BUFFER |        NONE |           0 |          1 |      5x |    1.194 s |  0.30% |    1.194 s |  0.30% |        449806715 |       690.244 MiB |       489.816 MiB |
| HOST_BUFFER |        NONE |        1000 |          1 |     13x |    1.231 s |  0.73% |    1.231 s |  0.73% |        436166059 |       684.575 MiB |       483.465 MiB |
| HOST_BUFFER |        NONE |           0 |         32 |     32x | 479.830 ms |  1.05% | 479.824 ms |  1.05% |       1118890411 |       690.236 MiB |        57.157 MiB |
| HOST_BUFFER |        NONE |        1000 |         32 |     33x | 467.041 ms |  3.32% | 467.036 ms |  3.32% |       1149528753 |       683.858 MiB |        49.200 MiB |
|        VOID |      SNAPPY |           0 |          1 |     34x | 447.131 ms |  0.76% | 447.125 ms |  0.76% |       1200717349 |         1.670 GiB |       486.275 MiB |
|        VOID |      SNAPPY |        1000 |          1 |     25x | 617.968 ms |  0.67% | 617.964 ms |  0.67% |        868774327 |         1.679 GiB |       354.557 MiB |
|        VOID |      SNAPPY |           0 |         32 |      5x | 452.829 ms |  0.46% | 452.823 ms |  0.46% |       1185608038 |         1.197 GiB |        41.990 MiB |
|        VOID |      SNAPPY |        1000 |         32 |     33x | 466.512 ms |  1.52% | 466.506 ms |  1.52% |       1150833558 |         1.206 GiB |        23.796 MiB |
|        VOID |        NONE |           0 |          1 |     46x | 332.880 ms |  1.02% | 332.874 ms |  1.02% |       1612837327 |       690.244 MiB |       489.816 MiB |
|        VOID |        NONE |        1000 |          1 |     41x | 367.183 ms |  0.95% | 367.177 ms |  0.95% |       1462157417 |       684.575 MiB |       483.465 MiB |
|        VOID |        NONE |           0 |         32 |     36x | 421.991 ms |  1.58% | 421.985 ms |  1.58% |       1272251333 |       690.236 MiB |        57.157 MiB |
|        VOID |        NONE |        1000 |         32 |     36x | 423.722 ms |  1.22% | 423.716 ms |  1.22% |       1267053977 |       683.858 MiB |        49.200 MiB |
```

### `LIBCUDF_CUFILE_POLICY=KIVKIO` (with this PR)
```
CUDF_BENCHMARK_DROP_CACHE=1 LIBCUDF_CUFILE_POLICY=KVIKIO ./ORC_WRITER_NVBENCH --devices 7 --benchmark orc_write_io_compression

|     io      | compression | cardinality | run_length | Samples |  CPU Time  | Noise  |  GPU Time  | Noise  | bytes_per_second | peak_memory_usage | encoded_file_size |
|-------------|-------------|-------------|------------|---------|------------|--------|------------|--------|------------------|-------------------|-------------------|
|    FILEPATH |      SNAPPY |           0 |          1 |     13x |    1.117 s |  6.71% |    1.117 s |  6.71% |        480440387 |         1.670 GiB |       486.275 MiB |
|    FILEPATH |      SNAPPY |        1000 |          1 |     14x |    1.077 s |  2.63% |    1.077 s |  2.63% |        498567238 |         1.679 GiB |       354.557 MiB |
|    FILEPATH |      SNAPPY |           0 |         32 |     30x | 501.035 ms |  1.00% | 501.030 ms |  1.00% |       1071534335 |         1.197 GiB |        41.990 MiB |
|    FILEPATH |      SNAPPY |        1000 |         32 |     30x | 500.984 ms |  1.10% | 500.980 ms |  1.10% |       1071642316 |         1.206 GiB |        23.796 MiB |
|    FILEPATH |        NONE |           0 |          1 |     13x |    1.152 s | 21.69% |    1.152 s | 21.70% |        466206065 |       690.244 MiB |       489.816 MiB |
|    FILEPATH |        NONE |        1000 |          1 |     13x |    1.084 s | 13.24% |    1.084 s | 13.24% |        495359475 |       684.575 MiB |       483.465 MiB |
|    FILEPATH |        NONE |           0 |         32 |     30x | 498.005 ms |  2.03% | 498.000 ms |  2.03% |       1078053921 |       690.236 MiB |        57.157 MiB |
|    FILEPATH |        NONE |        1000 |         32 |     31x | 490.966 ms |  1.87% | 490.961 ms |  1.87% |       1093510944 |       683.858 MiB |        49.200 MiB |
| HOST_BUFFER |      SNAPPY |           0 |          1 |      5x |    1.333 s |  0.45% |    1.333 s |  0.45% |        402632204 |         1.670 GiB |       486.275 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |          1 |      5x |    1.153 s |  0.32% |    1.153 s |  0.32% |        465578006 |         1.679 GiB |       354.557 MiB |
| HOST_BUFFER |      SNAPPY |           0 |         32 |     31x | 482.111 ms |  1.54% | 482.105 ms |  1.54% |       1113597063 |         1.197 GiB |        41.990 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |         32 |     32x | 477.450 ms |  1.27% | 477.444 ms |  1.27% |       1124468186 |         1.206 GiB |        23.796 MiB |
| HOST_BUFFER |        NONE |           0 |          1 |      5x |    1.224 s |  0.40% |    1.224 s |  0.40% |        438723846 |       690.244 MiB |       489.816 MiB |
| HOST_BUFFER |        NONE |        1000 |          1 |      5x |    1.254 s |  0.34% |    1.254 s |  0.34% |        428072718 |       684.575 MiB |       483.465 MiB |
| HOST_BUFFER |        NONE |           0 |         32 |     31x | 483.396 ms |  1.32% | 483.391 ms |  1.32% |       1110635468 |       690.236 MiB |        57.157 MiB |
| HOST_BUFFER |        NONE |        1000 |         32 |     32x | 467.038 ms |  1.51% | 467.033 ms |  1.51% |       1149536489 |       683.858 MiB |        49.200 MiB |
|        VOID |      SNAPPY |           0 |          1 |     34x | 447.051 ms |  0.94% | 447.046 ms |  0.94% |       1200929426 |         1.670 GiB |       486.275 MiB |
|        VOID |      SNAPPY |        1000 |          1 |      5x | 617.419 ms |  0.50% | 617.415 ms |  0.50% |        869546716 |         1.679 GiB |       354.557 MiB |
|        VOID |      SNAPPY |           0 |         32 |     34x | 445.136 ms |  1.19% | 445.131 ms |  1.19% |       1206097674 |         1.197 GiB |        41.990 MiB |
|        VOID |      SNAPPY |        1000 |         32 |     33x | 467.527 ms |  1.77% | 467.521 ms |  1.77% |       1148335104 |         1.206 GiB |        23.796 MiB |
|        VOID |        NONE |           0 |          1 |     45x | 333.658 ms |  1.23% | 333.652 ms |  1.23% |       1609076322 |       690.244 MiB |       489.816 MiB |
|        VOID |        NONE |        1000 |          1 |     41x | 367.980 ms |  1.06% | 367.973 ms |  1.06% |       1458994436 |       684.575 MiB |       483.465 MiB |
|        VOID |        NONE |           0 |         32 |     36x | 423.013 ms |  1.67% | 423.007 ms |  1.67% |       1269177781 |       690.236 MiB |        57.157 MiB |
|        VOID |        NONE |        1000 |         32 |     36x | 424.873 ms |  1.23% | 424.868 ms |  1.23% |       1263619162 |       683.858 MiB |        49.200 MiB |
```


### `LIBCUDF_CUFILE_POLICY=KIVKIO` (**without** this PR)
```
CUDF_BENCHMARK_DROP_CACHE=1 LIBCUDF_CUFILE_POLICY=KVIKIO ./ORC_WRITER_NVBENCH --devices 7 --benchmark orc_write_io_compression
|     io      | compression | cardinality | run_length | Samples |  CPU Time  | Noise  |  GPU Time  | Noise  | bytes_per_second | peak_memory_usage | encoded_file_size |
|-------------|-------------|-------------|------------|---------|------------|--------|------------|--------|------------------|-------------------|-------------------|
|    FILEPATH |      SNAPPY |           0 |          1 |     12x |    1.195 s |  7.58% |    1.195 s |  7.58% |        449191663 |         1.670 GiB |       486.275 MiB |
|    FILEPATH |      SNAPPY |        1000 |          1 |     13x |    1.113 s |  2.17% |    1.113 s |  2.17% |        482223468 |         1.679 GiB |       354.557 MiB |
|    FILEPATH |      SNAPPY |           0 |         32 |     24x | 621.309 ms |  1.45% | 621.304 ms |  1.45% |        864102762 |         1.197 GiB |        41.990 MiB |
|    FILEPATH |      SNAPPY |        1000 |         32 |     27x | 559.675 ms |  1.21% | 559.670 ms |  1.21% |        959263320 |         1.206 GiB |        23.796 MiB |
|    FILEPATH |        NONE |           0 |          1 |     12x |    1.253 s | 17.82% |    1.253 s | 17.82% |        428429247 |       690.244 MiB |       489.816 MiB |
|    FILEPATH |        NONE |        1000 |          1 |     13x |    1.154 s |  9.07% |    1.154 s |  9.07% |        465144594 |       684.575 MiB |       483.465 MiB |
|    FILEPATH |        NONE |           0 |         32 |     23x | 655.856 ms |  1.64% | 655.852 ms |  1.64% |        818585291 |       690.236 MiB |        57.157 MiB |
|    FILEPATH |        NONE |        1000 |         32 |     26x | 587.785 ms |  1.43% | 587.781 ms |  1.43% |        913386635 |       683.858 MiB |        49.200 MiB |
| HOST_BUFFER |      SNAPPY |           0 |          1 |      5x |    1.327 s |  0.21% |    1.327 s |  0.21% |        404688167 |         1.670 GiB |       486.275 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |          1 |      5x |    1.152 s |  0.11% |    1.152 s |  0.11% |        466042735 |         1.679 GiB |       354.557 MiB |
| HOST_BUFFER |      SNAPPY |           0 |         32 |     32x | 482.019 ms |  1.64% | 482.012 ms |  1.65% |       1113811263 |         1.197 GiB |        41.990 MiB |
| HOST_BUFFER |      SNAPPY |        1000 |         32 |      5x | 473.683 ms |  0.30% | 473.677 ms |  0.30% |       1133411483 |         1.206 GiB |        23.796 MiB |
| HOST_BUFFER |        NONE |           0 |          1 |      5x |    1.224 s |  0.45% |    1.224 s |  0.45% |        438631758 |       690.244 MiB |       489.816 MiB |
| HOST_BUFFER |        NONE |        1000 |          1 |      9x |    1.254 s |  0.50% |    1.254 s |  0.50% |        427995911 |       684.575 MiB |       483.465 MiB |
| HOST_BUFFER |        NONE |           0 |         32 |     32x | 481.819 ms |  1.15% | 481.813 ms |  1.15% |       1114271697 |       690.236 MiB |        57.157 MiB |
| HOST_BUFFER |        NONE |        1000 |         32 |      5x | 462.816 ms |  0.37% | 462.810 ms |  0.37% |       1160025243 |       683.858 MiB |        49.200 MiB |
|        VOID |      SNAPPY |           0 |          1 |     34x | 447.425 ms |  0.92% | 447.419 ms |  0.92% |       1199928350 |         1.670 GiB |       486.275 MiB |
|        VOID |      SNAPPY |        1000 |          1 |      9x | 618.225 ms |  0.48% | 618.221 ms |  0.48% |        868412207 |         1.679 GiB |       354.557 MiB |
|        VOID |      SNAPPY |           0 |         32 |     34x | 447.361 ms |  2.01% | 447.356 ms |  2.01% |       1200098149 |         1.197 GiB |        41.990 MiB |
|        VOID |      SNAPPY |        1000 |         32 |     33x | 467.867 ms |  1.08% | 467.861 ms |  1.08% |       1147500067 |         1.206 GiB |        23.796 MiB |
|        VOID |        NONE |           0 |          1 |     45x | 335.043 ms |  1.06% | 335.037 ms |  1.06% |       1602424306 |       690.244 MiB |       489.816 MiB |
|        VOID |        NONE |        1000 |          1 |      5x | 366.788 ms |  0.24% | 366.782 ms |  0.24% |       1463733567 |       684.575 MiB |       483.465 MiB |
|        VOID |        NONE |           0 |         32 |     36x | 422.473 ms |  1.26% | 422.467 ms |  1.26% |       1270798385 |       690.236 MiB |        57.157 MiB |
|        VOID |        NONE |        1000 |         32 |     36x | 426.112 ms |  1.70% | 426.107 ms |  1.70% |       1259945083 |       683.858 MiB |        49.200 MiB |
```





</details>




cc. @GregoryKimball

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #190
  • Loading branch information
madsbk authored Apr 14, 2023
1 parent 08c3f71 commit 59a5f3e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ cufile.log
docs/build/
cpp/doxygen/html/
.mypy_cache
.hypothesis
30 changes: 29 additions & 1 deletion cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,6 +81,7 @@ class defaults {
kvikio::third_party::thread_pool _thread_pool{get_num_threads_from_env()};
bool _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;

static unsigned int get_num_threads_from_env()
{
Expand Down Expand Up @@ -109,6 +110,14 @@ class defaults {
}
_task_size = env;
}
// Determine the default value of `gds_threshold`
{
const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024);
if (env <= 0) {
throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer");
}
_gds_threshold = env;
}
}

static defaults* instance()
Expand Down Expand Up @@ -202,6 +211,25 @@ class defaults {
* @param nbytes The default task size in bytes.
*/
static void task_size_reset(std::size_t nbytes) { instance()->_task_size = nbytes; }

/**
* @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
*
* In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut
* that circumvent the threadpool and use the POSIX backend directly.
*
* Set the default value using `kvikio::default::task_size_reset()` or by setting the
* `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 1 MiB.
*
* @return The default GDS threshold size in bytes.
*/
[[nodiscard]] static std::size_t gds_threshold() { return instance()->_gds_threshold; }

/**
* @brief Reset the default GDS threshold, which is the minimum size to use GDS (in bytes).
* @param nbytes The default GDS threshold size in bytes.
*/
static void gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; }
};

} // namespace kvikio
45 changes: 38 additions & 7 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ class FileHandle {
* This API is a parallel async version of `.read()` that partition the operation
* into tasks of size `task_size` for execution in the default thread pool.
*
* In order to improve performance of small buffers, when `size < gds_threshold` a shortcut
* that circumvent the threadpool and use the POSIX backend directly is used.
*
* @note For cuFile reads, the base address of the allocation `buf` is part of is used.
* This means that when registering buffers, use the base address of the allocation.
* This is what `memory_register` and `memory_deregister` do automatically.
Expand All @@ -372,12 +375,14 @@ class FileHandle {
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param task_size Size of each task in bytes.
* @param gds_threshold Minimum buffer size to use GDS and the thread pool.
* @return Future that on completion returns the size of bytes that were successfully read.
*/
std::future<std::size_t> pread(void* buf,
std::size_t size,
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size())
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold())
{
if (is_host_memory(buf)) {
auto op = [this](void* hostPtr_base,
Expand All @@ -392,14 +397,24 @@ class FileHandle {
}

CUcontext ctx = get_context_from_pointer(buf);
auto task = [this, ctx](void* devPtr_base,

// Shortcut that circumvent the threadpool and use the POSIX backend directly.
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}

// Regular case that use the threadpool and run the tasks in parallel
auto task = [this, ctx](void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset) -> std::size_t {
PushAndPopContext c(ctx);
return read(devPtr_base, size, file_offset, devPtr_offset);
};

auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
}
Expand All @@ -410,6 +425,9 @@ class FileHandle {
* This API is a parallel async version of `.write()` that partition the operation
* into tasks of size `task_size` for execution in the default thread pool.
*
* In order to improve performance of small buffers, when `size < gds_threshold` a shortcut
* that circumvent the threadpool and use the POSIX backend directly is used.
*
* @note For cuFile reads, the base address of the allocation `buf` is part of is used.
* This means that when registering buffers, use the base address of the allocation.
* This is what `memory_register` and `memory_deregister` do automatically.
Expand All @@ -418,12 +436,14 @@ class FileHandle {
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write from.
* @param task_size Size of each task in bytes.
* @param gds_threshold Minimum buffer size to use GDS and the thread pool.
* @return Future that on completion returns the size of bytes that were successfully written.
*/
std::future<std::size_t> pwrite(const void* buf,
std::size_t size,
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size())
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold())
{
if (is_host_memory(buf)) {
auto op = [this](const void* hostPtr_base,
Expand All @@ -438,7 +458,18 @@ class FileHandle {
}

CUcontext ctx = get_context_from_pointer(buf);
auto op = [this, ctx](const void* devPtr_base,

// Shortcut that circumvent the threadpool and use the POSIX backend directly.
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}

// Regular case that use the threadpool and run the tasks in parallel
auto op = [this, ctx](const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset) -> std::size_t {
Expand Down
5 changes: 4 additions & 1 deletion python/kvikio/_lib/kvikio_cxx_api.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
Expand Down Expand Up @@ -52,6 +52,9 @@ cdef extern from "<kvikio/defaults.hpp>" namespace "kvikio::defaults" nogil:
void thread_pool_nthreads_reset(unsigned int nthreads) except +
size_t task_size() except +
void task_size_reset(size_t nbytes) except +
size_t gds_threshold() except +
void gds_threshold_reset(size_t nbytes) except +


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
Expand Down
12 changes: 10 additions & 2 deletions python/kvikio/_lib/libkvikio.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,16 @@ def task_size() -> int:
return kvikio_cxx_api.task_size()


def task_size_reset(nthreads: int) -> None:
kvikio_cxx_api.task_size_reset(nthreads)
def task_size_reset(nbytes: int) -> None:
kvikio_cxx_api.task_size_reset(nbytes)


def gds_threshold() -> int:
return kvikio_cxx_api.gds_threshold()


def gds_threshold_reset(nbytes: int) -> None:
kvikio_cxx_api.gds_threshold_reset(nbytes)


cdef pair[uintptr_t, size_t] _parse_buffer(buf, size, bint accept_host_buffer) except *:
Expand Down
55 changes: 51 additions & 4 deletions python/kvikio/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def task_size() -> int:
Return
------
nthreads: int
The number of threads in the current thread pool.
nbytes: int
The default task size in bytes.
"""
return libkvikio.task_size()

Expand All @@ -131,7 +131,7 @@ def task_size_reset(nbytes: int) -> None:
Parameters
----------
nbytes : int
The number of threads to use.
The default task size in bytes.
"""
libkvikio.task_size_reset(nbytes)

Expand All @@ -143,11 +143,58 @@ def set_task_size(nbytes: int):
Parameters
----------
nbytes : int
The number of threads to use.
The default task size in bytes.
"""
old_value = task_size()
try:
task_size_reset(nbytes)
yield
finally:
task_size_reset(old_value)


def gds_threshold() -> int:
"""Get the default GDS threshold, which is the minimum size to use GDS.
In order to improve performance of small IO, `.pread()` and `.pwrite()`
implements a shortcut that circumvent the threadpool and use the POSIX
backend directly.
Set the default value using `gds_threshold_reset()` or by setting the
`KVIKIO_TASK_SIZE` environment variable. If not set, the default value
is 1 MiB.
Return
------
nbytes : int
The default GDS threshold size in bytes.
"""
return libkvikio.gds_threshold()


def gds_threshold_reset(nbytes: int) -> None:
"""Reset the default GDS threshold, which is the minimum size to use GDS.
Parameters
----------
nbytes : int
The default GDS threshold size in bytes.
"""
libkvikio.gds_threshold_reset(nbytes)


@contextlib.contextmanager
def set_gds_threshold(nbytes: int):
"""Context for resetting the default GDS threshold.
Parameters
----------
nbytes : int
The default GDS threshold size in bytes.
"""
old_value = gds_threshold()
try:
gds_threshold_reset(nbytes)
yield
finally:
gds_threshold_reset(old_value)
13 changes: 13 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import pytest

import kvikio.defaults

mp = mp.get_context("spawn") # type: ignore


Expand Down Expand Up @@ -83,3 +85,14 @@ def xp(request):

with ctx:
yield pytest.importorskip(module_name)


@pytest.fixture(
params=[0, 2**20],
ids=["gds_threshold=0MB", "gds_threshold=1MB"],
)
def gds_threshold(request):
"""Fixture to parametrize over GDS threshold values"""

with kvikio.defaults.set_gds_threshold(request.param):
yield request.param
10 changes: 5 additions & 5 deletions python/tests/test_basic_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def check_bit_flags(x: int, y: int) -> bool:
@pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10])
@pytest.mark.parametrize("nthreads", [1, 3, 4, 16])
@pytest.mark.parametrize("tasksize", [199, 1024])
def test_read_write(tmp_path, xp, size, nthreads, tasksize):
def test_read_write(tmp_path, xp, gds_threshold, size, nthreads, tasksize):
"""Test basic read/write"""
filename = tmp_path / "test-file"

Expand Down Expand Up @@ -81,7 +81,7 @@ def test_set_compat_mode_between_io(tmp_path):
assert f.write(a) == a.nbytes


def test_write_to_files_in_chunks(tmp_path, xp):
def test_write_to_files_in_chunks(tmp_path, xp, gds_threshold):
"""Write to files in chunks"""
filename = tmp_path / "test-file"

Expand Down Expand Up @@ -118,7 +118,7 @@ def test_write_to_files_in_chunks(tmp_path, xp):
"start,end",
[(0, 10 * 4096), (1, int(1.3 * 4096)), (int(2.1 * 4096), int(5.6 * 4096))],
)
def test_read_write_slices(tmp_path, xp, nthreads, tasksize, start, end):
def test_read_write_slices(tmp_path, xp, gds_threshold, nthreads, tasksize, start, end):
"""Read and write different slices"""

with kvikio.defaults.set_num_threads(nthreads):
Expand Down Expand Up @@ -178,7 +178,7 @@ def with_no_cuda_context():
assert err == cuda.CUresult.CUDA_SUCCESS


def test_no_current_cuda_context(tmp_path, xp):
def test_no_current_cuda_context(tmp_path, xp, gds_threshold):
"""Test IO when CUDA context is current"""
filename = tmp_path / "test-file"
a = xp.arange(100)
Expand All @@ -194,7 +194,7 @@ def test_no_current_cuda_context(tmp_path, xp):
@pytest.mark.skipif(
cupy.cuda.runtime.getDeviceCount() < 2, reason="requires multiple GPUs"
)
def test_multiple_gpus(tmp_path, xp):
def test_multiple_gpus(tmp_path, xp, gds_threshold):
"""Test IO from two different GPUs"""
filename = tmp_path / "test-file"

Expand Down

0 comments on commit 59a5f3e

Please sign in to comment.