Skip to content

Commit

Permalink
Implement a simple GDS-backed data source that does not rely upon Kvi…
Browse files Browse the repository at this point in the history
…kio.
  • Loading branch information
tpn committed Oct 24, 2024
1 parent 1dcb443 commit 47bccc7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 1 deletion.
32 changes: 31 additions & 1 deletion cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ enum class datasource_kind {
*
* It supports asynchronous reads, and will use the provided CUDA stream for
* all I/O operations when possible.
*
* @see cudf::io::kvikio_datasource_params
*/
KVIKIO = 0,
DEFAULT = KVIKIO,
Expand All @@ -66,16 +68,27 @@ enum class datasource_kind {
*
* It supports asynchronous reads, but does not do any stream synchronization,
* as the reads are all performed on the host.
*
* @see cudf::io::kvikio_datasource_params
*/
KVIKIO_COMPAT,

/**
* @brief Kvikio-based data source that will fail if GDS is not available.
* Specifically, `cudf::io::datasource::create()` when called with this kind
* of data source will throw a `cudf::logic_error` if GDS is not available.
*
* @see cudf::io::kvikio_datasource_params
*/
KVIKIO_GDS,

/**
* @brief GDS-based data source that does not use Kvikio.
*
* @see cudf::io::gds_datasource_params
*/
GDS,

/**
* @brief Host-based data source that does not support any device or async
* operations.
Expand Down Expand Up @@ -238,10 +251,27 @@ struct odirect_datasource_params {
}
};

/**
* @brief Parameters for the GDS data source.
*/
struct gds_datasource_params {
/**
* @brief The threshold at which the data source will switch from using
* host-based reads to device-based (i.e. GDS) reads, if GDS is available.
*
* This parameter should represent the read size where GDS is faster than
* a posix read() plus the overhead of a host-to-device memcpy.
*
* Defaults to 128KB.
*/
size_t device_read_threshold{128 << 10};
};

/**
* @brief Union of parameters for different data sources.
*/
using datasource_params = std::variant<kvikio_datasource_params, odirect_datasource_params>;
using datasource_params =
std::variant<kvikio_datasource_params, odirect_datasource_params, gds_datasource_params>;

/**
* @brief Interface class for providing input data to the readers.
Expand Down
66 changes: 66 additions & 0 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,60 @@ class host_buffer_source final : public datasource {
cudf::host_span<std::byte const> _h_buffer; ///< A non-owning view of the existing host data
};

/**
* @brief Implementation class that wraps a GDS-enabled cuFile input object.
*
* N.B. Named `cufile_source` instead of `gds_source` as cuFile is more
* descriptive of the underlying implementation.
*/
class cufile_source : public host_source {
public:
cufile_source(std::string const& filepath, gds_datasource_params const& params)
: host_source(filepath), _params(params)
{
set_datasource_kind(datasource_kind::GDS);
_cufile_in = detail::make_cufile_input(filepath);
}

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] bool is_device_read_preferred(size_t size) const override
{
return size >= _params.device_read_threshold;
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
auto const read_size = get_read_size(size, offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
return device_read_async(offset, size, dst, stream).get();
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

private:
gds_datasource_params _params; ///< GDS parameters
std::unique_ptr<detail::cufile_input_impl> _cufile_in; ///< cuFile input obj
};

/**
* @brief Wrapper class for user implemented data sources
*
Expand Down Expand Up @@ -940,6 +994,18 @@ std::unique_ptr<datasource> datasource::create(std::string const& filepath,
}
return std::make_unique<kvikio_source>(filepath.c_str(), new_params);
}
case datasource_kind::GDS: {
gds_datasource_params new_params;
if (params) {
if (auto gds_params = std::get_if<gds_datasource_params>(&params.value())) {
// Copy the user-provided parameters into our local variable.
new_params = *gds_params;
} else {
CUDF_FAIL("Invalid parameters for GDS-based datasource.");
}
}
return std::make_unique<cufile_source>(filepath.c_str(), new_params);
}
case datasource_kind::HOST: return std::make_unique<host_source>(filepath);
case datasource_kind::ODIRECT: {
odirect_datasource_params new_params;
Expand Down

0 comments on commit 47bccc7

Please sign in to comment.