Skip to content

Commit

Permalink
Unify batch and stream API check (#271)
Browse files Browse the repository at this point in the history
We now use the `cuFileReadAsync` symbol to check for availability of both the stream and batch API of cuFile. 

We used to look for the mangled symbol `_ZTS13CUfileOpError` to determine the availability of the batch API. However, since mangling is compiler specific, we prefer using `cuFileReadAsync` even though it means that the batch API isn't available until CUDA version 12.2.


Additionally, this PR also implements `CUFILE_CHECK_STREAM_IO()`, which is used to check async IO errors.
cc. @tell-rebanta

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #271
  • Loading branch information
madsbk authored Aug 30, 2023
1 parent 65343a1 commit 63d76a9
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 106 deletions.
5 changes: 2 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ else()
set(cuFile_BATCH_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}")
string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location)
if(cuFileGetVersion_location EQUAL "-1")
string(FIND "${CUFILE_H_STR}" "cuFileReadAsync" cuFileReadAsync_location)
if(cuFileReadAsync_location EQUAL "-1")
set(cuFile_STREAM_API_FOUND FALSE)
else()
set(cuFile_STREAM_API_FOUND TRUE)
Expand All @@ -94,7 +94,6 @@ target_link_libraries(kvikio INTERFACE CUDA::toolkit)
if(cuFile_FOUND)
target_link_libraries(kvikio INTERFACE cufile::cuFile_interface)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_FOUND)

if(cuFile_BATCH_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND)
endif()
Expand Down
52 changes: 27 additions & 25 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ int main()
<< " threads): " << read << endl;
}

if (kvikio::is_batch_available() && !kvikio::defaults::compat_mode()) {
if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
constexpr int num_ops_in_batch = 4;
Expand Down Expand Up @@ -195,39 +195,41 @@ int main()
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}
}

cout << "stream : " << kvikio::is_stream_available() << endl;
if (kvikio::is_stream_available()) {
{
cout << "Performing stream I/O using file handle" << endl;
off_t f_off = 0, d_off = 0;
ssize_t bytes_done;
CUstream stream;
cout << "Performing async I/O using file handle" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit a sync write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess);

/*
* For stream based I/Os, buffer registration is not mandatory. However,
* it gives a better performance.
*/
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

kvikio::buffer_register(a_dev, SIZE);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Write : " << bytes_done << endl;
kvikio::buffer_deregister(a_dev);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write : " << *bytes_done_p << endl;

/* Read */
bytes_done = 0;
kvikio::buffer_register(c_dev, SIZE);
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Read : " << bytes_done << endl;
kvikio::buffer_deregister(c_dev);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read : " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
} else {
cout << "The batch and stream API isn't available, requires CUDA 12.2+" << endl;
}
}
28 changes: 26 additions & 2 deletions cpp/include/kvikio/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct CUfileException : public std::runtime_error {
std::string(err_str) + ")"}; \
} \
} while (0)
#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, CUfileException)
#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, kvikio::CUfileException)
#endif

#ifdef KVIKIO_CUFILE_FOUND
Expand All @@ -75,8 +75,32 @@ struct CUfileException : public std::runtime_error {
cufileop_status_error(error.err)}; \
} \
} while (0)
#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, CUfileException)
#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, kvikio::CUfileException)
#endif
#endif

#ifndef CUFILE_CHECK_STREAM_IO
#define CUFILE_CHECK_STREAM_IO(...) \
GET_CUFILE_CHECK_STREAM_IO_MACRO( \
__VA_ARGS__, CUFILE_CHECK_STREAM_IO_2, CUFILE_CHECK_STREAM_IO_1) \
(__VA_ARGS__)
#define GET_CUFILE_CHECK_STREAM_IO_MACRO(_1, _2, NAME, ...) NAME
#ifdef KVIKIO_CUFILE_FOUND
#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \
do { \
int const _nbytes = (*_nbytes_done); \
if (_nbytes < 0) { \
throw(_exception_type){std::string{"cuFile error at: "} + __FILE__ + ":" + \
KVIKIO_STRINGIFY(__LINE__) + ": " + std::to_string(_nbytes)}; \
} \
} while (0)
#else
// if cufile isn't available, we don't do anything in the body
#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \
do { \
} while (0)
#endif
#define CUFILE_CHECK_STREAM_IO_1(_call) CUFILE_CHECK_STREAM_IO_2(_call, kvikio::CUfileException)
#endif

} // namespace kvikio
86 changes: 49 additions & 37 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,28 +497,35 @@ class FileHandle {
}

/**
* @brief Reads specified bytes from the file into the device memory.
* @brief Reads specified bytes from the file into the device memory asynchronously.
*
* This API reads size bytes asynchronously from the file into device memory writing
* to a specified offset using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned read.
* This is an asynchronous call and will be executed in sequence for the specified stream.
* This is an asynchronous version of `.read()`, which will be executed in sequence
* for the specified stream.
*
* @note For the `devPtr_offset`, if data will be read starting exactly from the
* `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should
* be set to 0. To read starting from an offset in the registered buffer range,
* the relative offset should be specified in the `devPtr_offset`, and the
* `devPtr_base` must remain set to the base address that was used in the
* `buffer_register` call.
* The arguments have the same meaning as in `.read()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* This parameter should be used only with registered buffers.
* @param bytes_read number of bytes that were successfully read.
* @param stream associated stream for this I/O.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set using
* cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write. Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated
* until execution time.
* @param bytes_read Pointer to the bytes read from file. This pointer should be a non-NULL value
* and *bytes_read set to 0. The bytes_read memory should be allocated with cuMemHostAlloc/malloc/
* mmap or registered with cuMemHostRegister.
* After successful execution of the operation in the stream, the value *bytes_read will contain
* either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void read_async(void* devPtr_base,
std::size_t* size,
Expand All @@ -532,34 +539,39 @@ class FileHandle {
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
#endif
}

/**
* @brief Writes specified bytes from the device memory into the file.
* @brief Writes specified bytes from the device memory into the file asynchronously.
*
* This API writes asynchronously the data from the GPU memory to the file at a specified offset
* and size bytes by using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned writes.
* This is an asynchronous call and will be executed in sequence for the specified stream.
* This is an asynchronous version of `.write()`, which will be executed in sequence
* for the specified stream.
*
* @note GDS functionality modified the standard file system metadata in SysMem.
* However, GDS functionality does not take any special responsibility for writing
* that metadata back to permanent storage. The data is not guaranteed to be present
* after a system crash unless the application uses an explicit `fsync(2)` call. If the
* file is opened with an `O_SYNC` flag, the metadata will be written to the disk before
* the call is complete.
* Refer to the note in read for more information about `devPtr_offset`.
* The arguments have the same meaning as in `.write()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write at.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @param bytes_written number of bytes that were successfully written.
* @param stream associated stream for this I/O.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set
* using cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write.
* @param bytes_written Pointer to the bytes read from file. This pointer should be a non-NULL
* value and *bytes_written set to 0. The bytes_written memory should be allocated with
* cuMemHostAlloc/malloc/mmap or registered with cuMemHostRegister.
* After successful execution of the operation in the stream, the value *bytes_written will
* contain either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void write_async(void* devPtr_base,
std::size_t* size,
Expand All @@ -573,7 +585,7 @@ class FileHandle {
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
#endif
}

Expand Down
48 changes: 9 additions & 39 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ class cuFileAPI {
decltype(cuFileBatchIOCancel)* BatchIOCancel{nullptr};
decltype(cuFileBatchIODestroy)* BatchIODestroy{nullptr};
#endif
bool batch_available = false;

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
decltype(cuFileGetVersion)* GetVersion{nullptr};
decltype(cuFileReadAsync)* ReadAsync{nullptr};
decltype(cuFileWriteAsync)* WriteAsync{nullptr};
decltype(cuFileStreamRegister)* StreamRegister{nullptr};
Expand Down Expand Up @@ -98,32 +96,18 @@ class cuFileAPI {
get_symbol(BatchIOGetStatus, lib, KVIKIO_STRINGIFY(cuFileBatchIOGetStatus));
get_symbol(BatchIOCancel, lib, KVIKIO_STRINGIFY(cuFileBatchIOCancel));
get_symbol(BatchIODestroy, lib, KVIKIO_STRINGIFY(cuFileBatchIODestroy));

// HACK: we use the mangled name of the `CUfileOpError` to determine if cuFile's
// batch API is available (v12.0.1+). Notice, the symbols of `cuFileBatchIOSetUp` & co.
// exist all the way back to CUDA v11.5 but calling them is undefined behavior.
// TODO: when CUDA v12.2 is released, use `cuFileReadAsync` to determine the availability
// of both the batch and async API.
try {
void* s{};
get_symbol(s, lib, "_ZTS13CUfileOpError");
batch_available = true;
} catch (const std::runtime_error&) {
}
#endif

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion));
get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync));
get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync));
get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister));
get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister));
try {
void* s{};
get_symbol(s, lib, "cuFileGetVersion");
get_symbol(s, lib, "cuFileReadAsync");
stream_available = true;
} catch (const std::runtime_error&) {
stream_available = false;
}
#endif

Expand Down Expand Up @@ -193,39 +177,25 @@ inline bool is_cufile_available()
}

/**
* @brief Check if cuFile's batch API is available
* @brief Check if cuFile's batch and stream API is available
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_BATCH_API_FOUND
inline bool is_batch_available()
{
try {
return is_cufile_available() && cuFileAPI::instance().batch_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_batch_available() { return false; }
#endif

/**
* @brief Check if cuFile's stream API is available
* Technically, the batch API is available in CUDA 12.1 but since there is no good
* way to check CUDA version using the driver API, we check for the existing of the
* `cuFileReadAsync` symbol, which is defined in CUDA 12.2+.
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
inline bool is_stream_available()
#if defined(KVIKIO_CUFILE_STREAM_API_FOUND) && defined(KVIKIO_CUFILE_STREAM_API_FOUND)
inline bool is_batch_and_stream_available()
{
try {
return cuFileAPI::instance().stream_available;
return is_cufile_available() && cuFileAPI::instance().stream_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_stream_available() { return false; }
constexpr bool is_batch_and_stream_available() { return false; }
#endif

} // namespace kvikio

0 comments on commit 63d76a9

Please sign in to comment.