From 63d76a99a0847f3f978de57efe8076d36c237bfa Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 30 Aug 2023 14:01:24 +0200 Subject: [PATCH] Unify batch and stream API check (#271) We now use the `cuFileReadAsync` symbol to check for availability of both the stream and batch API of cuFile. We used to look for the mangled symbol `_ZTS13CUfileOpError` to determine the availability of the batch API. However, since mangling is compiler specific, we prefer using `cuFileReadAsync` even though it means that the batch API isn't available until CUDA version 12.2. Additionally, this PR also implements `CUFILE_CHECK_STREAM_IO()`, which is used to check async IO errors. cc. @tell-rebanta Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Robert Maynard (https://github.com/robertmaynard) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/kvikio/pull/271 --- cpp/CMakeLists.txt | 5 +- cpp/examples/basic_io.cpp | 52 +++++++++--------- cpp/include/kvikio/error.hpp | 28 +++++++++- cpp/include/kvikio/file_handle.hpp | 86 +++++++++++++++++------------- cpp/include/kvikio/shim/cufile.hpp | 48 ++++------------- 5 files changed, 113 insertions(+), 106 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 149f21b981..5b218d2717 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -71,8 +71,8 @@ else() set(cuFile_BATCH_API_FOUND TRUE) endif() message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}") - string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location) - if(cuFileGetVersion_location EQUAL "-1") + string(FIND "${CUFILE_H_STR}" "cuFileReadAsync" cuFileReadAsync_location) + if(cuFileReadAsync_location EQUAL "-1") set(cuFile_STREAM_API_FOUND FALSE) else() set(cuFile_STREAM_API_FOUND TRUE) @@ -94,7 +94,6 @@ target_link_libraries(kvikio INTERFACE CUDA::toolkit) if(cuFile_FOUND) target_link_libraries(kvikio INTERFACE cufile::cuFile_interface) target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_FOUND) - if(cuFile_BATCH_API_FOUND) target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND) endif() diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index ec36ce32c6..2f3a72257c 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -148,7 +148,7 @@ int main() << " threads): " << read << endl; } - if (kvikio::is_batch_available() && !kvikio::defaults::compat_mode()) { + if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) { // Here we use the batch API to read "/tmp/test-file" into `b_dev` by // submitting 4 batch operations. constexpr int num_ops_in_batch = 4; @@ -195,39 +195,41 @@ int main() check(statuses.empty()); cout << "Batch canceling of all 4 operations" << endl; } - } - cout << "stream : " << kvikio::is_stream_available() << endl; - if (kvikio::is_stream_available()) { { - cout << "Performing stream I/O using file handle" << endl; - off_t f_off = 0, d_off = 0; - ssize_t bytes_done; - CUstream stream; + cout << "Performing async I/O using file handle" << endl; + off_t f_off{0}; + off_t d_off{0}; + // Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0. + ssize_t* bytes_done_p{}; + check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess); + *bytes_done_p = 0; + + // Let's create a new stream and submit a sync write + CUstream stream{}; check(cudaStreamCreate(&stream) == cudaSuccess); kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false); - check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess); - - /* - * For stream based I/Os, buffer registration is not mandatory. However, - * it gives a better performance. - */ + check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess); + f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream); - kvikio::buffer_register(a_dev, SIZE); - f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream); + // After synchronizing `stream`, we can read the number of bytes written check(cudaStreamSynchronize(stream) == cudaSuccess); - check(bytes_done == SIZE); - cout << "File stream Write : " << bytes_done << endl; - kvikio::buffer_deregister(a_dev); + // Note, `*bytes_done_p` might be negative, which indicate an IO error thus we + // use `CUFILE_CHECK_STREAM_IO` to check for errors. + CUFILE_CHECK_STREAM_IO(bytes_done_p); + check(*bytes_done_p == SIZE); + cout << "File async write : " << *bytes_done_p << endl; /* Read */ - bytes_done = 0; - kvikio::buffer_register(c_dev, SIZE); - f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream); + *bytes_done_p = 0; + f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream); check(cudaStreamSynchronize(stream) == cudaSuccess); - check(bytes_done == SIZE); - cout << "File stream Read : " << bytes_done << endl; - kvikio::buffer_deregister(c_dev); + CUFILE_CHECK_STREAM_IO(bytes_done_p); + check(*bytes_done_p == SIZE); + cout << "File async read : " << *bytes_done_p << endl; + check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess); } + } else { + cout << "The batch and stream API isn't available, requires CUDA 12.2+" << endl; } } diff --git a/cpp/include/kvikio/error.hpp b/cpp/include/kvikio/error.hpp index ca809c63b5..6ef5d915ac 100644 --- a/cpp/include/kvikio/error.hpp +++ b/cpp/include/kvikio/error.hpp @@ -53,7 +53,7 @@ struct CUfileException : public std::runtime_error { std::string(err_str) + ")"}; \ } \ } while (0) -#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, CUfileException) +#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, kvikio::CUfileException) #endif #ifdef KVIKIO_CUFILE_FOUND @@ -75,8 +75,32 @@ struct CUfileException : public std::runtime_error { cufileop_status_error(error.err)}; \ } \ } while (0) -#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, CUfileException) +#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, kvikio::CUfileException) #endif #endif +#ifndef CUFILE_CHECK_STREAM_IO +#define CUFILE_CHECK_STREAM_IO(...) \ + GET_CUFILE_CHECK_STREAM_IO_MACRO( \ + __VA_ARGS__, CUFILE_CHECK_STREAM_IO_2, CUFILE_CHECK_STREAM_IO_1) \ + (__VA_ARGS__) +#define GET_CUFILE_CHECK_STREAM_IO_MACRO(_1, _2, NAME, ...) NAME +#ifdef KVIKIO_CUFILE_FOUND +#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \ + do { \ + int const _nbytes = (*_nbytes_done); \ + if (_nbytes < 0) { \ + throw(_exception_type){std::string{"cuFile error at: "} + __FILE__ + ":" + \ + KVIKIO_STRINGIFY(__LINE__) + ": " + std::to_string(_nbytes)}; \ + } \ + } while (0) +#else +// if cufile isn't available, we don't do anything in the body +#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \ + do { \ + } while (0) +#endif +#define CUFILE_CHECK_STREAM_IO_1(_call) CUFILE_CHECK_STREAM_IO_2(_call, kvikio::CUfileException) +#endif + } // namespace kvikio diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index e2e61ffac2..dc1391ca6e 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -497,28 +497,35 @@ class FileHandle { } /** - * @brief Reads specified bytes from the file into the device memory. + * @brief Reads specified bytes from the file into the device memory asynchronously. * - * This API reads size bytes asynchronously from the file into device memory writing - * to a specified offset using GDS functionality. The API works correctly for unaligned - * offset and data sizes, although the performance is not on-par with aligned read. - * This is an asynchronous call and will be executed in sequence for the specified stream. + * This is an asynchronous version of `.read()`, which will be executed in sequence + * for the specified stream. * - * @note For the `devPtr_offset`, if data will be read starting exactly from the - * `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should - * be set to 0. To read starting from an offset in the registered buffer range, - * the relative offset should be specified in the `devPtr_offset`, and the - * `devPtr_base` must remain set to the base address that was used in the - * `buffer_register` call. + * The arguments have the same meaning as in `.read()` but some of them are deferred. That is, + * the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution + * time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API. * * @param devPtr_base Base address of buffer in device memory. For registered buffers, * `devPtr_base` must remain set to the base address used in the `buffer_register` call. - * @param size Size in bytes to read. - * @param file_offset Offset in the file to read from. - * @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into. - * This parameter should be used only with registered buffers. - * @param bytes_read number of bytes that were successfully read. - * @param stream associated stream for this I/O. + * @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O + * submission, then you must set it to the maximum possible I/O size for that stream I/O. Later + * the actual size can be set prior to the stream I/O execution. + * @param file_offset Pointer to offset in the file from which to read. Unless otherwise set using + * cuFileStreamRegister API, this value will not be evaluated until execution time. + * @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to + * write. Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated + * until execution time. + * @param bytes_read Pointer to the bytes read from file. This pointer should be a non-NULL value + * and *bytes_read set to 0. The bytes_read memory should be allocated with cuMemHostAlloc/malloc/ + * mmap or registered with cuMemHostRegister. + * After successful execution of the operation in the stream, the value *bytes_read will contain + * either: + * - The number of bytes successfully read. + * - -1 on IO errors. + * - All other errors return a negative integer value of the CUfileOpError enum value. + * @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation + * synchronous. */ inline void read_async(void* devPtr_base, std::size_t* size, @@ -532,34 +539,39 @@ class FileHandle { _handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream)); return; #else - throw CUfileException("KvikIO not compiled with stream support."); + throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+."); #endif } /** - * @brief Writes specified bytes from the device memory into the file. + * @brief Writes specified bytes from the device memory into the file asynchronously. * - * This API writes asynchronously the data from the GPU memory to the file at a specified offset - * and size bytes by using GDS functionality. The API works correctly for unaligned - * offset and data sizes, although the performance is not on-par with aligned writes. - * This is an asynchronous call and will be executed in sequence for the specified stream. + * This is an asynchronous version of `.write()`, which will be executed in sequence + * for the specified stream. * - * @note GDS functionality modified the standard file system metadata in SysMem. - * However, GDS functionality does not take any special responsibility for writing - * that metadata back to permanent storage. The data is not guaranteed to be present - * after a system crash unless the application uses an explicit `fsync(2)` call. If the - * file is opened with an `O_SYNC` flag, the metadata will be written to the disk before - * the call is complete. - * Refer to the note in read for more information about `devPtr_offset`. + * The arguments have the same meaning as in `.write()` but some of them are deferred. That is, + * the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution + * time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API. * * @param devPtr_base Base address of buffer in device memory. For registered buffers, * `devPtr_base` must remain set to the base address used in the `buffer_register` call. - * @param size Size in bytes to write. - * @param file_offset Offset in the file to write at. - * @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from. - * This parameter should be used only with registered buffers. - * @param bytes_written number of bytes that were successfully written. - * @param stream associated stream for this I/O. + * @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O + * submission, then you must set it to the maximum possible I/O size for that stream I/O. Later + * the actual size can be set prior to the stream I/O execution. + * @param file_offset Pointer to offset in the file from which to read. Unless otherwise set + * using cuFileStreamRegister API, this value will not be evaluated until execution time. + * @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to + * write. + * @param bytes_written Pointer to the bytes read from file. This pointer should be a non-NULL + * value and *bytes_written set to 0. The bytes_written memory should be allocated with + * cuMemHostAlloc/malloc/mmap or registered with cuMemHostRegister. + * After successful execution of the operation in the stream, the value *bytes_written will + * contain either: + * - The number of bytes successfully read. + * - -1 on IO errors. + * - All other errors return a negative integer value of the CUfileOpError enum value. + * @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation + * synchronous. */ inline void write_async(void* devPtr_base, std::size_t* size, @@ -573,7 +585,7 @@ class FileHandle { _handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream)); return; #else - throw CUfileException("KvikIO not compiled with stream support."); + throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+."); #endif } diff --git a/cpp/include/kvikio/shim/cufile.hpp b/cpp/include/kvikio/shim/cufile.hpp index 08851b0994..84db69b72e 100644 --- a/cpp/include/kvikio/shim/cufile.hpp +++ b/cpp/include/kvikio/shim/cufile.hpp @@ -54,10 +54,8 @@ class cuFileAPI { decltype(cuFileBatchIOCancel)* BatchIOCancel{nullptr}; decltype(cuFileBatchIODestroy)* BatchIODestroy{nullptr}; #endif - bool batch_available = false; #ifdef KVIKIO_CUFILE_STREAM_API_FOUND - decltype(cuFileGetVersion)* GetVersion{nullptr}; decltype(cuFileReadAsync)* ReadAsync{nullptr}; decltype(cuFileWriteAsync)* WriteAsync{nullptr}; decltype(cuFileStreamRegister)* StreamRegister{nullptr}; @@ -98,32 +96,18 @@ class cuFileAPI { get_symbol(BatchIOGetStatus, lib, KVIKIO_STRINGIFY(cuFileBatchIOGetStatus)); get_symbol(BatchIOCancel, lib, KVIKIO_STRINGIFY(cuFileBatchIOCancel)); get_symbol(BatchIODestroy, lib, KVIKIO_STRINGIFY(cuFileBatchIODestroy)); - - // HACK: we use the mangled name of the `CUfileOpError` to determine if cuFile's - // batch API is available (v12.0.1+). Notice, the symbols of `cuFileBatchIOSetUp` & co. - // exist all the way back to CUDA v11.5 but calling them is undefined behavior. - // TODO: when CUDA v12.2 is released, use `cuFileReadAsync` to determine the availability - // of both the batch and async API. - try { - void* s{}; - get_symbol(s, lib, "_ZTS13CUfileOpError"); - batch_available = true; - } catch (const std::runtime_error&) { - } #endif #ifdef KVIKIO_CUFILE_STREAM_API_FOUND - get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion)); get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync)); get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync)); get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister)); get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister)); try { void* s{}; - get_symbol(s, lib, "cuFileGetVersion"); + get_symbol(s, lib, "cuFileReadAsync"); stream_available = true; } catch (const std::runtime_error&) { - stream_available = false; } #endif @@ -193,39 +177,25 @@ inline bool is_cufile_available() } /** - * @brief Check if cuFile's batch API is available + * @brief Check if cuFile's batch and stream API is available * - * @return The boolean answer - */ -#ifdef KVIKIO_CUFILE_BATCH_API_FOUND -inline bool is_batch_available() -{ - try { - return is_cufile_available() && cuFileAPI::instance().batch_available; - } catch (const std::runtime_error&) { - return false; - } -} -#else -constexpr bool is_batch_available() { return false; } -#endif - -/** - * @brief Check if cuFile's stream API is available + * Technically, the batch API is available in CUDA 12.1 but since there is no good + * way to check CUDA version using the driver API, we check for the existing of the + * `cuFileReadAsync` symbol, which is defined in CUDA 12.2+. * * @return The boolean answer */ -#ifdef KVIKIO_CUFILE_STREAM_API_FOUND -inline bool is_stream_available() +#if defined(KVIKIO_CUFILE_STREAM_API_FOUND) && defined(KVIKIO_CUFILE_STREAM_API_FOUND) +inline bool is_batch_and_stream_available() { try { - return cuFileAPI::instance().stream_available; + return is_cufile_available() && cuFileAPI::instance().stream_available; } catch (const std::runtime_error&) { return false; } } #else -constexpr bool is_stream_available() { return false; } +constexpr bool is_batch_and_stream_available() { return false; } #endif } // namespace kvikio