Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding optional parquet reader schema #11524

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class parquet_reader_options {
bool _use_pandas_metadata = true;
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};
// Whether to store binary data as a string column
std::optional<std::vector<bool>> _convert_binary_to_strings{std::nullopt};

std::optional<std::vector<reader_column_schema>> _reader_column_schema;

/**
* @brief Constructor from source info.
Expand Down Expand Up @@ -121,16 +121,13 @@ class parquet_reader_options {
[[nodiscard]] bool is_enabled_use_pandas_metadata() const { return _use_pandas_metadata; }

/**
* @brief Returns optional vector of true/false values depending on whether binary data should be
* converted to strings or not.
* @brief Returns optional tree of metadata.
*
* @return vector with ith value `true` if binary data should be converted to strings for the ith
* column. Will return std::nullopt if the user did not set this option, which defaults to all
* binary data being converted to strings.
* @return vector of reader_column_schema objects.
*/
[[nodiscard]] std::optional<std::vector<bool>> get_convert_binary_to_strings() const
[[nodiscard]] std::optional<std::vector<reader_column_schema>> get_column_schema() const
{
return _convert_binary_to_strings;
return _reader_column_schema;
}

/**
Expand Down Expand Up @@ -204,14 +201,14 @@ class parquet_reader_options {
void enable_use_pandas_metadata(bool val) { _use_pandas_metadata = val; }

/**
* @brief Sets to enable/disable conversion of binary to strings per column.
* @brief Sets reader column schema.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* @param val Tree of schema nodes to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
*/
void set_convert_binary_to_strings(std::vector<bool> val)
void set_column_schema(std::vector<reader_column_schema> val)
{
_convert_binary_to_strings = std::move(val);
_reader_column_schema = std::move(val);
}

/**
Expand Down Expand Up @@ -320,15 +317,14 @@ class parquet_reader_options_builder {
}

/**
* @brief Sets enable/disable conversion of binary to strings per column.
* @brief Sets reader metadata.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
* @param val Tree of metadata information.
* @return this for chaining
*/
parquet_reader_options_builder& convert_binary_to_strings(std::vector<bool> val)
parquet_reader_options_builder& set_column_schema(std::vector<reader_column_schema> val)
{
options._convert_binary_to_strings = std::move(val);
options._reader_column_schema = std::move(val);
return *this;
}

Expand Down
91 changes: 91 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <map>
#include <memory>
Expand Down Expand Up @@ -645,5 +646,95 @@ struct partition_info {
}
};

/**
* @brief schema element for reader
*
*/
class reader_column_schema {
// Whether to read binary data as a string column
bool _convert_binary_to_strings{true};

std::vector<reader_column_schema> children;

public:
reader_column_schema() = default;

/**
* @brief Construct a new reader column schema object
*
* @param number_of_children number of child schema objects to default construct
*/
reader_column_schema(size_type number_of_children) { children.resize(number_of_children); }

/**
* @brief Construct a new reader column schema object with a span defining the children
*
* @param child_span span of child schema objects
*/
reader_column_schema(host_span<reader_column_schema> const& child_span)
{
children.assign(child_span.begin(), child_span.end());
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @brief Add the children metadata of this column
*
* @param child The children metadata of this column to add
* @return this for chaining
*/
reader_column_schema& add_child(reader_column_schema const& child)
{
children.push_back(child);
return *this;
}

/**
* @brief Get reference to a child of this column
*
* @param i Index of the child to get
* @return this for chaining
*/
[[nodiscard]] reader_column_schema& child(size_type i) { return children[i]; }

/**
* @brief Get const reference to a child of this column
*
* @param i Index of the child to get
* @return this for chaining
*/
[[nodiscard]] reader_column_schema const& child(size_type i) const { return children[i]; }

/**
* @brief Specifies whether this column should be written as binary or string data
* Only valid for the following column types:
* string, list<int8>
*
* @param convert_to_string True = convert binary to strings False = return binary
* @return this for chaining
*/
reader_column_schema& set_convert_binary_to_strings(bool convert_to_string)
{
_convert_binary_to_strings = convert_to_string;
return *this;
}

/**
* @brief Get whether to encode this column as binary or string data
*
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_convert_binary_to_strings() const
{
return _convert_binary_to_strings;
}

/**
* @brief Get the number of child objects
*
* @return number of children
*/
[[nodiscard]] size_t get_num_children() const { return children.size(); }
};

} // namespace io
} // namespace cudf
2 changes: 1 addition & 1 deletion cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ table_with_metadata read_avro(std::unique_ptr<cudf::io::datasource>&& source,
mr);

for (size_t i = 0; i < column_types.size(); ++i) {
out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr));
out_columns.emplace_back(make_column(out_buffers[i], nullptr, std::nullopt, stream, mr));
}
} else {
// Create empty columns
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
out_columns.emplace_back(
cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr));
} else {
out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr));
out_columns.emplace_back(make_column(out_buffers[i], nullptr, std::nullopt, stream, mr));
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ table_with_metadata convert_data_to_table(parse_options_view const& parse_opts,
for (size_t i = 0; i < num_columns; ++i) {
out_buffers[i].null_count() = num_records - h_valid_counts[i];

auto out_column = make_column(out_buffers[i], nullptr, stream, mr);
auto out_column = make_column(out_buffers[i], nullptr, std::nullopt, stream, mr);
if (out_column->type().id() == type_id::STRING) {
// Need to remove escape character in case of '\"' and '\\'
out_columns.emplace_back(cudf::strings::detail::replace(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ void reader::impl::create_columns(std::vector<std::vector<column_buffer>>&& col_
[&](auto const col_meta) {
schema_info.emplace_back("");
auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0, stream);
return make_column(col_buffer, &schema_info.back(), stream, _mr);
return make_column(col_buffer, &schema_info.back(), std::nullopt, stream, _mr);
});
}

Expand Down
27 changes: 7 additions & 20 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>>&& sources,
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

// Binary columns can be read as binary or strings
_force_binary_columns_as_strings = options.get_convert_binary_to_strings();
_reader_column_schema = options.get_column_schema();

// Select only columns required by the options
std::tie(_input_columns, _output_columns, _output_column_schemas) =
Expand Down Expand Up @@ -1766,28 +1766,15 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// decoding of column data itself
decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows);

auto make_output_column = [&](column_buffer& buf, column_name_info* schema_info, int i) {
auto col = make_column(buf, schema_info, _stream, _mr);
if (should_write_byte_array(i)) {
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
if (schema.converted_type == parquet::UNKNOWN) {
auto const num_rows = col->size();
auto data = col->release();
return make_lists_column(
num_rows,
std::move(data.children[strings_column_view::offsets_column_index]),
std::move(data.children[strings_column_view::chars_column_index]),
UNKNOWN_NULL_COUNT,
std::move(*data.null_mask));
}
}
return col;
};

// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(make_output_column(_output_columns[i], &col_name, i));
auto const metadata =
_reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
out_columns.emplace_back(
make_column(_output_columns[i], &col_name, metadata, _stream, _mr));
}
}
}
Expand Down
16 changes: 1 addition & 15 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,6 @@ class reader::impl {
size_t min_row,
size_t total_rows);

/**
* @brief Indicates if a column should be written as a byte array
*
* @param col column to check
* @return true if the column should be written as a byte array
* @return false if the column should be written as normal for that type
*/
bool should_write_byte_array(int col)
{
return _output_columns[col].type.id() == type_id::STRING &&
_force_binary_columns_as_strings.has_value() &&
!_force_binary_columns_as_strings.value()[col];
}

private:
rmm::cuda_stream_view _stream;
rmm::mr::device_memory_resource* _mr = nullptr;
Expand All @@ -217,7 +203,7 @@ class reader::impl {
std::vector<int> _output_column_schemas;

bool _strings_to_categorical = false;
std::optional<std::vector<bool>> _force_binary_columns_as_strings;
std::optional<std::vector<reader_column_schema>> _reader_column_schema;
data_type _timestamp_type{type_id::EMPTY};
};

Expand Down
67 changes: 50 additions & 17 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

#include "column_buffer.hpp"
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -58,18 +59,39 @@ void column_buffer::create(size_type _size,
*/
std::unique_ptr<column> make_column(column_buffer& buffer,
column_name_info* schema_info,
std::optional<reader_column_schema> const& schema,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (schema_info != nullptr) { schema_info->name = buffer.name; }

switch (buffer.type.id()) {
case type_id::STRING:
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
if (schema.value_or(reader_column_schema{}).is_enabled_convert_binary_to_strings()) {
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
}

return make_strings_column(*buffer._strings, stream, mr);
} else {
// convert to binary
auto const string_col = make_strings_column(*buffer._strings, stream, mr);
auto const num_rows = string_col->size();
auto col_contest = string_col->release();

if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"binary"});
}

return make_lists_column(
num_rows,
std::move(col_contest.children[strings_column_view::offsets_column_index]),
std::move(col_contest.children[strings_column_view::chars_column_index]),
UNKNOWN_NULL_COUNT,
std::move(*col_contest.null_mask));
}
return make_strings_column(*buffer._strings, stream, mr);

case type_id::LIST: {
// make offsets column
Expand All @@ -83,9 +105,15 @@ std::unique_ptr<column> make_column(column_buffer& buffer,
child_info = &schema_info->children.back();
}

CUDF_EXPECTS(not schema.has_value() or schema->get_num_children() > 0,
"Invalid schema provided for read, expected child data for list!");
auto const child_schema = schema.has_value()
? std::make_optional<reader_column_schema>(schema->child(0))
: std::nullopt;

// make child column
CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer");
auto child = make_column(buffer.children[0], child_info, stream, mr);
auto child = make_column(buffer.children[0], child_info, child_schema, stream, mr);

// make the final list column (note : size is the # of offsets, so our actual # of rows is 1
// less)
Expand All @@ -101,17 +129,22 @@ std::unique_ptr<column> make_column(column_buffer& buffer,
case type_id::STRUCT: {
std::vector<std::unique_ptr<cudf::column>> output_children;
output_children.reserve(buffer.children.size());
std::transform(buffer.children.begin(),
buffer.children.end(),
std::back_inserter(output_children),
[&](column_buffer& col) {
column_name_info* child_info = nullptr;
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{""});
child_info = &schema_info->children.back();
}
return make_column(col, child_info, stream, mr);
});
for (size_t i = 0; i < buffer.children.size(); ++i) {
column_name_info* child_info = nullptr;
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{""});
child_info = &schema_info->children.back();
}

CUDF_EXPECTS(not schema.has_value() or schema->get_num_children() > i,
"Invalid schema provided for read, expected more child data for struct!");
auto const child_schema = schema.has_value()
? std::make_optional<reader_column_schema>(schema->child(i))
: std::nullopt;

output_children.emplace_back(
make_column(buffer.children[i], child_info, child_schema, stream, mr));
}

return make_structs_column(buffer.size,
std::move(output_children),
Expand Down
Loading