Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move filepath and mmap logic out of json/csv up to functions.cpp #9040

Merged
merged 7 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,40 @@ class csv_reader_options {
*/
std::size_t get_byte_range_size() const { return _byte_range_size; }

/**
* @brief Returns number of bytes to read with padding.
*/
std::size_t get_byte_range_size_with_padding() const
{
if (_byte_range_size == 0) {
return 0;
} else {
return _byte_range_size + get_byte_range_padding();
}
}

/**
* @brief Returns number of bytes to pad when reading.
*/
std::size_t get_byte_range_padding() const
{
auto const num_names = _names.size();
auto const num_dtypes = std::visit([](const auto& dtypes) { return dtypes.size(); }, _dtypes);
auto const num_columns = std::max(num_dtypes, num_names);

auto const max_row_bytes = 16 * 1024; // 16KB
auto const column_bytes = 64;
auto const base_padding = 1024; // 1KB

if (num_columns == 0) {
// Use flat size if the number of columns is not known
return max_row_bytes;
}

// Expand the size based on the number of columns, if available
return base_padding + num_columns * column_bytes;
}

/**
* @brief Returns names of the columns.
*/
Expand Down Expand Up @@ -1199,7 +1233,7 @@ class csv_reader_options_builder {
* @return The set of columns along with metadata.
*/
table_with_metadata read_csv(
csv_reader_options const& options,
csv_reader_options options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
13 changes: 0 additions & 13 deletions cpp/include/cudf/io/detail/avro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@ class reader {
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor from an array of file paths
*
* @param filepaths Paths to the files containing the input dataset
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::string> const& filepaths,
avro_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Constructor from an array of datasources
*
Expand Down
13 changes: 0 additions & 13 deletions cpp/include/cudf/io/detail/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ class reader {
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor from an array of file paths
*
* @param filepaths Paths to the files containing the input dataset
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::string> const& filepaths,
orc_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Constructor from an array of datasources
*
Expand Down
13 changes: 0 additions & 13 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,6 @@ class reader {
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor from an array of file paths
*
* @param filepaths Paths to the files containing the input dataset
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::string> const& filepaths,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Constructor from an array of datasources
*
Expand Down
34 changes: 33 additions & 1 deletion cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,38 @@ class json_reader_options {
*/
size_t get_byte_range_size() const { return _byte_range_size; }

/**
* @brief Returns number of bytes to read with padding.
*/
size_t get_byte_range_size_with_padding() const
{
if (_byte_range_size == 0) {
return 0;
} else {
return _byte_range_size + get_byte_range_padding();
}
}

/**
* @brief Returns number of bytes to pad when reading.
*/
size_t get_byte_range_padding() const
{
auto const num_columns = std::visit([](const auto& dtypes) { return dtypes.size(); }, _dtypes);
cwharris marked this conversation as resolved.
Show resolved Hide resolved

auto const max_row_bytes = 16 * 1024; // 16KB
auto const column_bytes = 64;
auto const base_padding = 1024; // 1KB

if (num_columns == 0) {
// Use flat size if the number of columns is not known
return max_row_bytes;
}

// Expand the size based on the number of columns, if available
return base_padding + num_columns * column_bytes;
}

/**
* @brief Whether to read the file as a json object per line.
*/
Expand Down Expand Up @@ -364,7 +396,7 @@ class json_reader_options_builder {
* @return The set of columns along with metadata.
*/
table_with_metadata read_json(
json_reader_options const& options,
json_reader_options options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,6 @@ table_with_metadata reader::impl::read(avro_reader_options const& options,
return {std::make_unique<table>(std::move(out_columns)), std::move(metadata_out)};
}

// Forward to implementation
reader::reader(std::vector<std::string> const& filepaths,
avro_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported.");
_impl = std::make_unique<impl>(datasource::create(filepaths[0]), options, mr);
}

// Forward to implementation
reader::reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
avro_reader_options const& options,
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/comp/io_uncomp.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

#include <cudf/utilities/span.hpp>

using cudf::host_span;

namespace cudf {
Expand All @@ -42,7 +43,7 @@ enum {

std::vector<char> io_uncompress_single_h2d(void const* src, size_t src_size, int stream_type);

std::vector<char> get_uncompressed_data(host_span<char const> data, std::string const& compression);
std::vector<char> get_uncompressed_data(host_span<char const> data, compression_type compression);

class HostDecompressor {
public:
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/io/comp/uncomp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ std::vector<char> io_uncompress_single_h2d(const void* src, size_t src_size, int
// Unsupported format
break;
}

CUDF_EXPECTS(comp_data != nullptr, "Unsupported compressed stream type");
CUDF_EXPECTS(comp_len > 0, "Unsupported compressed stream type");

Expand Down Expand Up @@ -422,17 +423,17 @@ std::vector<char> io_uncompress_single_h2d(const void* src, size_t src_size, int
* @return Vector containing the output uncompressed data
*/
std::vector<char> get_uncompressed_data(host_span<char const> const data,
std::string const& compression)
compression_type compression)
{
int comp_type = IO_UNCOMP_STREAM_TYPE_INFER;
if (compression == "gzip")
comp_type = IO_UNCOMP_STREAM_TYPE_GZIP;
else if (compression == "zip")
comp_type = IO_UNCOMP_STREAM_TYPE_ZIP;
else if (compression == "bz2")
comp_type = IO_UNCOMP_STREAM_TYPE_BZIP2;
else if (compression == "xz")
comp_type = IO_UNCOMP_STREAM_TYPE_XZ;

switch (compression) {
case compression_type::GZIP: comp_type = IO_UNCOMP_STREAM_TYPE_GZIP; break;
case compression_type::ZIP: comp_type = IO_UNCOMP_STREAM_TYPE_ZIP; break;
case compression_type::BZIP2: comp_type = IO_UNCOMP_STREAM_TYPE_BZIP2; break;
case compression_type::XZ: comp_type = IO_UNCOMP_STREAM_TYPE_XZ; break;
default: break;
}
cwharris marked this conversation as resolved.
Show resolved Hide resolved

return io_uncompress_single_h2d(data.data(), data.size(), comp_type);
}
Expand Down
81 changes: 13 additions & 68 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,6 @@ namespace csv {
using namespace cudf::io::csv;
using namespace cudf::io;

/**
* @brief Estimates the maximum expected length or a row, based on the number
* of columns
*
* If the number of columns is not available, it will return a value large
* enough for most use cases
*
* @param[in] num_columns Number of columns in the CSV file (optional)
*
* @return Estimated maximum size of a row, in bytes
*/
constexpr size_t calculateMaxRowSize(int num_columns = 0) noexcept
{
constexpr size_t max_row_bytes = 16 * 1024; // 16KB
constexpr size_t column_bytes = 64;
constexpr size_t base_padding = 1024; // 1KB
if (num_columns == 0) {
// Use flat size if the number of columns is not known
return max_row_bytes;
} else {
// Expand the size based on the number of columns, if available
return base_padding + num_columns * column_bytes;
}
}

/**
* @brief Translates a dtype string and returns its dtype enumeration and any
* extended dtype flags that are supported by cuIO. Often, this is a column
Expand Down Expand Up @@ -199,34 +174,21 @@ void erase_except_last(C& container, rmm::cuda_stream_view stream)
std::pair<rmm::device_uvector<char>, reader::impl::selected_rows_offsets>
reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream)
{
auto range_offset = opts_.get_byte_range_offset();
auto range_size = opts_.get_byte_range_size();
auto skip_rows = opts_.get_skiprows();
auto skip_end_rows = opts_.get_skipfooter();
auto num_rows = opts_.get_nrows();
auto range_offset = opts_.get_byte_range_offset();
auto range_size = opts_.get_byte_range_size();
auto range_size_padded = opts_.get_byte_range_size_with_padding();
auto skip_rows = opts_.get_skiprows();
auto skip_end_rows = opts_.get_skipfooter();
auto num_rows = opts_.get_nrows();

if (range_offset > 0 || range_size > 0) {
CUDF_EXPECTS(compression_type_ == "none",
CUDF_EXPECTS(opts_.get_compression() == compression_type::NONE,
"Reading compressed data using `byte range` is unsupported");
}
size_t map_range_size = 0;
if (range_size != 0) {
auto num_given_dtypes =
std::visit([](const auto& dtypes) { return dtypes.size(); }, opts_.get_dtypes());
const auto num_columns = std::max(opts_.get_names().size(), num_given_dtypes);
map_range_size = range_size + calculateMaxRowSize(num_columns);
}

// Support delayed opening of the file if using memory mapping datasource
// This allows only mapping of a subset of the file if using byte range
if (source_ == nullptr) {
assert(!filepath_.empty());
source_ = datasource::create(filepath_, range_offset, map_range_size);
}

// Transfer source data to GPU
if (!source_->is_empty()) {
auto data_size = (map_range_size != 0) ? map_range_size : source_->size();
auto data_size = (range_size_padded != 0) ? range_size_padded : source_->size();
cwharris marked this conversation as resolved.
Show resolved Hide resolved
auto buffer = source_->host_read(range_offset, data_size);

auto h_data = host_span<char const>( //
Expand All @@ -235,10 +197,11 @@ reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream)

std::vector<char> h_uncomp_data_owner;

if (compression_type_ != "none") {
h_uncomp_data_owner = get_uncompressed_data(h_data, compression_type_);
if (opts_.get_compression() != compression_type::NONE) {
h_uncomp_data_owner = get_uncompressed_data(h_data, opts_.get_compression());
h_data = h_uncomp_data_owner;
}

// None of the parameters for row selection is used, we are parsing the entire file
const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 &&
skip_end_rows <= 0 && num_rows == -1;
Expand Down Expand Up @@ -927,43 +890,25 @@ parse_options make_parse_options(csv_reader_options const& reader_opts,
}

reader::impl::impl(std::unique_ptr<datasource> source,
std::string filepath,
csv_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: mr_(mr), source_(std::move(source)), filepath_(filepath), opts_(options)
: mr_(mr), source_(std::move(source)), opts_(options)
{
num_actual_cols_ = opts_.get_names().size();
num_active_cols_ = num_actual_cols_;

compression_type_ =
infer_compression_type(opts_.get_compression(),
filepath,
{{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}});

opts = make_parse_options(options, stream);
}

// Forward to implementation
reader::reader(std::vector<std::string> const& filepaths,
csv_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported.");
// Delay actual instantiation of data source until read to allow for
// partial memory mapping of file using byte ranges
_impl = std::make_unique<impl>(nullptr, filepaths[0], options, stream, mr);
}

// Forward to implementation
reader::reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
csv_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported.");
_impl = std::make_unique<impl>(std::move(sources[0]), "", options, stream, mr);
_impl = std::make_unique<impl>(std::move(sources[0]), options, stream, mr);
}

// Destructor within this translation unit
Expand Down
4 changes: 0 additions & 4 deletions cpp/src/io/csv/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ class reader::impl {
* @brief Constructor from a dataset source with reader options.
*
* @param source Dataset source
* @param filepath Filepath if reading dataset from a file
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit impl(std::unique_ptr<datasource> source,
std::string filepath,
csv_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
Expand Down Expand Up @@ -222,8 +220,6 @@ class reader::impl {
private:
rmm::mr::device_memory_resource* mr_ = nullptr;
std::unique_ptr<datasource> source_;
std::string filepath_;
std::string compression_type_;
const csv_reader_options opts_;

cudf::size_type num_records_ = 0; // Number of rows with actual data
Expand Down
Loading