Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cuFile for Parquet IO when available #7444

Merged
merged 74 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
1dfd27d
add owning_buffer to datasource; add device_read path to parquet reader
Sep 17, 2020
f88569c
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 18, 2020
5cfc08f
name fix
Sep 18, 2020
eb0a075
CMake change to find and link to cufile lib
Sep 18, 2020
b494b43
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 18, 2020
582dbac
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 18, 2020
a7873f1
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 18, 2020
460af52
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 19, 2020
3d00911
link to cufile
Sep 22, 2020
8adf068
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 22, 2020
c834cdc
basic device_read
Sep 22, 2020
585d3a2
extract gds file into a separate source file
Sep 22, 2020
be84443
missing file from previous commit
Sep 22, 2020
62135c6
gds sink
Sep 22, 2020
f70974e
refactor cufile RAII into a separate struct
Sep 22, 2020
da3207b
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Sep 25, 2020
9220769
Merge branch 'branch-1.16' of https://github.com/rapidsai/cudf into f…
Sep 30, 2020
2acfeca
refactor gds threshold logic
Oct 2, 2020
37cbb35
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Oct 2, 2020
73092fc
override the cufile config file to always enable compatibility mode
Oct 7, 2020
f8017e9
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into f…
Oct 7, 2020
2aa7da7
fix missed rename
Oct 7, 2020
689a4d2
add docs; rename data members
Oct 7, 2020
674023b
Merge branch 'branch-0.17' of https://github.com/rapidsai/cudf into f…
Oct 30, 2020
5807bfd
fix merge
Oct 30, 2020
5284591
add missing EOF newlines
Oct 30, 2020
7000d55
fall back to host read/write if cufile fails
vuule Nov 20, 2020
8548871
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Feb 22, 2021
5896b7e
update to 0.19 stuff
vuule Feb 23, 2021
e5071ab
don't enforce compatibility mode through config file
vuule Feb 23, 2021
10a1eee
add stream parameter to device_read
vuule Feb 24, 2021
3014923
compile-time disable cufile code if the library is not installed
vuule Feb 24, 2021
f388265
style fix
vuule Feb 24, 2021
f433757
fix building without cufile
vuule Feb 24, 2021
fb6ea8b
CMake fix
vuule Feb 25, 2021
1189cf1
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Feb 25, 2021
2cef6f3
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Feb 25, 2021
ec52e85
cmake clean up
vuule Feb 25, 2021
d181fec
add missing null check
vuule Feb 25, 2021
1b6070e
avoid repeated failed attempts to initialize cuFile
vuule Feb 26, 2021
349973c
link to libcufile at runtime
vuule Feb 26, 2021
3560949
remove newline
vuule Mar 2, 2021
d1a4c26
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 2, 2021
378023b
add file path to error messages
vuule Mar 2, 2021
43a39da
Merge branch 'fea-gds-parquet' of https://github.com/vuule/cudf into …
vuule Mar 2, 2021
9c3e8fc
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 2, 2021
5ed479b
use FindcuFile
vuule Mar 2, 2021
1e2727e
add new file to cmake list
vuule Mar 2, 2021
a2f4019
avoid using unique_ptr to store the singleton
vuule Mar 3, 2021
fb94b56
add is_valid to shim
vuule Mar 3, 2021
b2771ad
add config class
vuule Mar 3, 2021
3077bd4
read env var to enable GDS
vuule Mar 3, 2021
9d350db
rename file_util.hpp
vuule Mar 3, 2021
09730f4
control compatiblity mode
vuule Mar 4, 2021
d5c67f8
style fix
vuule Mar 4, 2021
2e4d33c
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 4, 2021
8ac417e
update device_write condition in CSV writer
vuule Mar 4, 2021
273962c
fall back to host if cufile io initialization fails
vuule Mar 4, 2021
0ef4e31
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 4, 2021
071c619
docs and such
vuule Mar 4, 2021
c1010a9
(c) year
vuule Mar 4, 2021
13b3c6a
one more (c) year
vuule Mar 4, 2021
67d901b
namespace
vuule Mar 4, 2021
851c0bd
docs, comments; disable fallback w/ compat mode
vuule Mar 5, 2021
b853f97
moar docs
vuule Mar 5, 2021
750460f
remove includes
vuule Mar 5, 2021
6815b38
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 5, 2021
96c5b53
CMake improvement
vuule Mar 5, 2021
6be526e
address review feedback
vuule Mar 9, 2021
19d6ff4
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 12, 2021
0ad1418
fix a CMake error
vuule Mar 13, 2021
380287e
allocate the pinned buffer only if used
vuule Mar 13, 2021
d374f49
Merge branch 'branch-0.19' of https://github.com/rapidsai/cudf into f…
vuule Mar 16, 2021
e5f12b9
fix up CMake for cufile header only builds
vuule Mar 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ include(cmake/thirdparty/CUDF_GetDLPack.cmake)
include(cmake/thirdparty/CUDF_GetLibcudacxx.cmake)
# Stringify libcudf and libcudacxx headers used in JIT operations
include(cmake/Modules/StringifyJITHeaders.cmake)
# find cuFile
include(cmake/Modules/FindcuFile.cmake)

###################################################################################################
# - library targets -------------------------------------------------------------------------------
Expand Down Expand Up @@ -236,6 +238,7 @@ add_library(cudf
src/io/statistics/column_stats.cu
src/io/utilities/data_sink.cpp
src/io/utilities/datasource.cpp
src/io/utilities/file_io_utilities.cpp
src/io/utilities/parsing_utils.cu
src/io/utilities/type_conversion.cpp
src/jit/cache.cpp
Expand Down Expand Up @@ -459,6 +462,11 @@ else()
target_link_libraries(cudf PUBLIC CUDA::nvrtc CUDA::cudart CUDA::cuda_driver)
endif()

# Add cuFile interface if available
if(cuFile_FOUND AND NOT TARGET cuFile::cuFile_interface)
target_link_libraries(cudf PRIVATE cuFile::cuFile_interface)
endif()

file(WRITE "${CUDF_BINARY_DIR}/fatbin.ld"
[=[
SECTIONS
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/fixture/benchmark_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ class benchmark : public ::benchmark::Fixture {
std::shared_ptr<rmm::mr::device_memory_resource> mr;
};

}; // namespace cudf
} // namespace cudf
6 changes: 6 additions & 0 deletions cpp/cmake/Modules/FindcuFile.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ find_package_handle_standard_args(cuFile
cuFile_VERSION
)

if (cuFile_FOUND)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will want this to be:
if(cuFile_FOUND AND NOT TARGET cuFile::cuFile_interface)

That will allow multiple find_package(cuFile) to work correctly ( otherwise they will error out ).

add_library(cuFile::cuFile_interface IMPORTED INTERFACE)
target_include_directories(cuFile::cuFile_interface INTERFACE "$<BUILD_INTERFACE:${cuFile_INCLUDE_DIR}>")
target_compile_options(cuFile::cuFile_interface INTERFACE "${cuFile_COMPILE_OPTIONS}")
target_compile_definitions(cuFile::cuFile_interface INTERFACE CUFILE_FOUND)
endif ()

if (cuFile_FOUND AND NOT TARGET cuFile::cuFile)
add_library(cuFile::cuFile UNKNOWN IMPORTED)
Expand Down
26 changes: 19 additions & 7 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -107,23 +107,35 @@ class data_sink {
*/
virtual bool supports_device_write() const { return false; }

/**
* @brief Estimates whether a direct device write would be more optimal for the given size.
*
* @param size Number of bytes to write
* @return whether the device write is expected to be more performant for the given size
*/
virtual bool is_device_write_preferred(size_t size) const { return supports_device_write(); }

/**
* @brief Append the buffer content to the sink from a gpu address
*
* @param[in] data Pointer to the buffer to be written into the sink object
* @param[in] size Number of bytes to write
* For optimal performance, should only be called when `is_device_write_preferred` returns `true`.
* Data sink implementations that don't support direct device writes don't need to override
* this function.
*
* @return void
* @throws cudf::logic_error the object does not support direct device writes, i.e.
* `supports_device_write` returns `false`.
*
* @param gpu_data Pointer to the buffer to be written into the sink object
* @param size Number of bytes to write
* @param stream CUDA stream to use
*/
virtual void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream)
{
CUDF_FAIL("data_sink classes that support device_write must override this function.");
CUDF_FAIL("data_sink classes that support device_write must override it.");
}

/**
* @brief Flush the data written into the sink
*
* @return void
*/
virtual void flush() = 0;

Expand Down
94 changes: 81 additions & 13 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
#include <cudf/io/types.hpp>
#include <cudf/utilities/error.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <arrow/buffer.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
Expand Down Expand Up @@ -50,12 +52,15 @@ class datasource {
/**
* @brief Returns the address of the data in the buffer.
*/
virtual const uint8_t* data() const = 0;
virtual uint8_t const* data() const = 0;

/**
* @brief Base class destructor
*/
virtual ~buffer() {}

template <typename Container>
static std::unique_ptr<buffer> create(Container&& data_owner);
};

/**
Expand Down Expand Up @@ -147,37 +152,57 @@ class datasource {
*/
virtual bool supports_device_read() const { return false; }

/**
* @brief Estimates whether a direct device read would be more optimal for the given size.
*
* @param size Number of bytes to read
* @return whether the device read is expected to be more performant for the given size
*/
virtual bool is_device_read_preferred(size_t size) const { return supports_device_read(); }

/**
* @brief Returns a device buffer with a subset of data from the source.
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @throws cudf::logic_error the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param stream CUDA stream to use
*
* @return The data buffer in the device memory
*/
virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size)
virtual std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read must override this function.");
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
* @brief Reads a selected range into a preallocated device buffer
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @param[in] dst Address of the existing device memory
* @throws cudf::logic_error when the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read (can be smaller than size)
*/
virtual size_t device_read(size_t offset, size_t size, uint8_t* dst)
virtual size_t device_read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read must override this function.");
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
Expand Down Expand Up @@ -205,14 +230,57 @@ class datasource {

size_t size() const override { return _size; }

const uint8_t* data() const override { return _data; }
uint8_t const* data() const override { return _data; }

private:
uint8_t* const _data;
size_t const _size;
};

/**
* @brief Derived implementation of `buffer` that owns the data.
*
* Can use different container types to hold the data buffer.
*
* @tparam Container Type of the container object that owns the data
*/
template <typename Container>
class owning_buffer : public buffer {
public:
/**
* @brief Moves the input container into the newly created object.
*/
owning_buffer(Container&& data_owner)
: _data(std::move(data_owner)), _data_ptr(_data.data()), _size(_data.size())
{
}

/**
* @brief Moves the input container into the newly created object, and exposes a subspan of the
* buffer.
*/
owning_buffer(Container&& data_owner, uint8_t const* data_ptr, size_t size)
: _data(std::move(data_owner)), _data_ptr(data_ptr), _size(size)
{
}

size_t size() const override { return _size; }

uint8_t const* data() const override { return static_cast<uint8_t const*>(_data_ptr); }

private:
Container _data;
void const* _data_ptr;
size_t _size;
};
};

template <typename Container>
std::unique_ptr<datasource::buffer> datasource::buffer::create(Container&& data_owner)
{
return std::make_unique<owning_buffer<Container>>(std::move(data_owner));
}

/**
* @brief Implementation class for reading from an Apache Arrow file. The file
* could be a memory-mapped file or other implementation supported by Arrow.
Expand All @@ -230,7 +298,7 @@ class arrow_io_source : public datasource {
{
}
size_t size() const override { return arrow_buffer->size(); }
const uint8_t* data() const override { return arrow_buffer->data(); }
uint8_t const* data() const override { return arrow_buffer->data(); }
};

public:
Expand Down
32 changes: 12 additions & 20 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -416,36 +416,28 @@ void writer::impl::write_chunked(strings_column_view const& str_column_view,
auto total_num_bytes = strings_column.chars_size();
char const* ptr_all_bytes = strings_column.chars().data<char>();

if (out_sink_->supports_device_write()) {
// host algorithm call, but the underlying call
// is a device_write taking a device buffer;
//
if (out_sink_->is_device_write_preferred(total_num_bytes)) {
// Direct write from device memory
out_sink_->device_write(ptr_all_bytes, total_num_bytes, stream);
out_sink_->device_write(newline.data(),
newline.size(),
stream); // needs newline at the end, to separate from next chunk
} else {
// no device write possible;
//
// copy the bytes to host, too:
//
// copy the bytes to host to write them out
thrust::host_vector<char> h_bytes(total_num_bytes);
CUDA_TRY(cudaMemcpyAsync(h_bytes.data(),
ptr_all_bytes,
total_num_bytes * sizeof(char),
cudaMemcpyDeviceToHost,
stream.value()));

stream.synchronize();

// host algorithm call, where the underlying call
// is also host_write taking a host buffer;
//
char const* ptr_h_bytes = h_bytes.data();
out_sink_->host_write(ptr_h_bytes, total_num_bytes);
out_sink_->host_write(h_bytes.data(), total_num_bytes);
}

// Needs newline at the end, to separate from next chunk
if (out_sink_->is_device_write_preferred(newline.size())) {
out_sink_->device_write(newline.data(), newline.size(), stream);
} else {
out_sink_->host_write(options_.get_line_terminator().data(),
options_.get_line_terminator()
.size()); // needs newline at the end, to separate from next chunk
options_.get_line_terminator().size());
}
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
* Copyright (c) 2018-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,7 +184,7 @@ struct ColumnChunkDesc {
{
}

uint8_t *compressed_data; // pointer to compressed column chunk data
uint8_t const *compressed_data; // pointer to compressed column chunk data
size_t compressed_size; // total compressed data size for this chunk
size_t num_values; // total number of values in this column
size_t start_row; // starting row of this chunk
Expand Down
23 changes: 13 additions & 10 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -822,7 +822,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
* @copydoc cudf::io::detail::parquet::read_column_chunks
*/
void reader::impl::read_column_chunks(
std::vector<rmm::device_buffer> &page_data,
std::vector<std::unique_ptr<datasource::buffer>> &page_data,
hostdevice_vector<gpu::ColumnChunkDesc> &chunks, // TODO const?
size_t begin_chunk,
size_t end_chunk,
Expand Down Expand Up @@ -850,9 +850,15 @@ void reader::impl::read_column_chunks(
next_chunk++;
}
if (io_size != 0) {
auto buffer = _sources[chunk_source_map[chunk]]->host_read(io_offset, io_size);
page_data[chunk] = rmm::device_buffer(buffer->data(), buffer->size(), stream);
uint8_t *d_compdata = static_cast<uint8_t *>(page_data[chunk].data());
auto &source = _sources[chunk_source_map[chunk]];
if (source->is_device_read_preferred(io_size)) {
page_data[chunk] = source->device_read(io_offset, io_size, stream);
} else {
auto const buffer = source->host_read(io_offset, io_size);
page_data[chunk] =
datasource::buffer::create(rmm::device_buffer(buffer->data(), buffer->size(), stream));
}
auto d_compdata = page_data[chunk]->data();
do {
chunks[chunk].compressed_data = d_compdata;
d_compdata += chunks[chunk].compressed_size;
Expand Down Expand Up @@ -1414,7 +1420,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
std::vector<size_type> chunk_source_map(num_chunks);

// Tracker for eventually deallocating compressed and uncompressed data
std::vector<rmm::device_buffer> page_data(num_chunks);
std::vector<std::unique_ptr<datasource::buffer>> page_data(num_chunks);

// Keep track of column chunk file offsets
std::vector<size_t> column_chunk_offsets(num_chunks);
Expand Down Expand Up @@ -1516,10 +1522,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
decomp_page_data = decompress_page_data(chunks, pages, stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED && page_data[c].size() != 0) {
page_data[c].resize(0);
page_data[c].shrink_to_fit();
}
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { page_data[c].reset(); }
}
}

Expand Down
Loading