Skip to content

Commit

Permalink
Adding optional parquet reader schema (#11524)
Browse files Browse the repository at this point in the history
Adding a schema for reading parquet files. This is useful for things like binary data reading where the default behavior of cudf is to read it as a string column, but users wish to read it as a list<int8> column instead. Using a schema allows for nested data types to be expressed completely.

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - MithunR (https://github.com/mythrocks)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #11524
  • Loading branch information
hyperbolic2346 authored Aug 16, 2022
1 parent 63a47d9 commit 4178a51
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 292 deletions.
32 changes: 14 additions & 18 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class parquet_reader_options {
bool _use_pandas_metadata = true;
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};
// Whether to store binary data as a string column
std::optional<std::vector<bool>> _convert_binary_to_strings{std::nullopt};

std::optional<std::vector<reader_column_schema>> _reader_column_schema;

/**
* @brief Constructor from source info.
Expand Down Expand Up @@ -117,16 +117,13 @@ class parquet_reader_options {
[[nodiscard]] bool is_enabled_use_pandas_metadata() const { return _use_pandas_metadata; }

/**
* @brief Returns optional vector of true/false values depending on whether binary data should be
* converted to strings or not.
* @brief Returns optional tree of metadata.
*
* @return vector with ith value `true` if binary data should be converted to strings for the ith
* column. Will return std::nullopt if the user did not set this option, which defaults to all
* binary data being converted to strings.
* @return vector of reader_column_schema objects.
*/
[[nodiscard]] std::optional<std::vector<bool>> get_convert_binary_to_strings() const
[[nodiscard]] std::optional<std::vector<reader_column_schema>> get_column_schema() const
{
return _convert_binary_to_strings;
return _reader_column_schema;
}

/**
Expand Down Expand Up @@ -182,14 +179,14 @@ class parquet_reader_options {
void enable_use_pandas_metadata(bool val) { _use_pandas_metadata = val; }

/**
* @brief Sets to enable/disable conversion of binary to strings per column.
* @brief Sets reader column schema.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* @param val Tree of schema nodes to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
*/
void set_convert_binary_to_strings(std::vector<bool> val)
void set_column_schema(std::vector<reader_column_schema> val)
{
_convert_binary_to_strings = std::move(val);
_reader_column_schema = std::move(val);
}

/**
Expand Down Expand Up @@ -270,15 +267,14 @@ class parquet_reader_options_builder {
}

/**
* @brief Sets enable/disable conversion of binary to strings per column.
* @brief Sets reader metadata.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
* @param val Tree of metadata information.
* @return this for chaining
*/
parquet_reader_options_builder& convert_binary_to_strings(std::vector<bool> val)
parquet_reader_options_builder& set_column_schema(std::vector<reader_column_schema> val)
{
options._convert_binary_to_strings = std::move(val);
options._reader_column_schema = std::move(val);
return *this;
}

Expand Down
91 changes: 91 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <map>
#include <memory>
Expand Down Expand Up @@ -645,5 +646,95 @@ struct partition_info {
}
};

/**
* @brief schema element for reader
*
*/
class reader_column_schema {
// Whether to read binary data as a string column
bool _convert_binary_to_strings{true};

std::vector<reader_column_schema> children;

public:
reader_column_schema() = default;

/**
* @brief Construct a new reader column schema object
*
* @param number_of_children number of child schema objects to default construct
*/
reader_column_schema(size_type number_of_children) { children.resize(number_of_children); }

/**
* @brief Construct a new reader column schema object with a span defining the children
*
* @param child_span span of child schema objects
*/
reader_column_schema(host_span<reader_column_schema> const& child_span)
{
children.assign(child_span.begin(), child_span.end());
}

/**
* @brief Add the children metadata of this column
*
* @param child The children metadata of this column to add
* @return this for chaining
*/
reader_column_schema& add_child(reader_column_schema const& child)
{
children.push_back(child);
return *this;
}

/**
* @brief Get reference to a child of this column
*
* @param i Index of the child to get
* @return this for chaining
*/
[[nodiscard]] reader_column_schema& child(size_type i) { return children[i]; }

/**
* @brief Get const reference to a child of this column
*
* @param i Index of the child to get
* @return this for chaining
*/
[[nodiscard]] reader_column_schema const& child(size_type i) const { return children[i]; }

/**
* @brief Specifies whether this column should be written as binary or string data
* Only valid for the following column types:
* string, list<int8>
*
* @param convert_to_string True = convert binary to strings False = return binary
* @return this for chaining
*/
reader_column_schema& set_convert_binary_to_strings(bool convert_to_string)
{
_convert_binary_to_strings = convert_to_string;
return *this;
}

/**
* @brief Get whether to encode this column as binary or string data
*
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_convert_binary_to_strings() const
{
return _convert_binary_to_strings;
}

/**
* @brief Get the number of child objects
*
* @return number of children
*/
[[nodiscard]] size_t get_num_children() const { return children.size(); }
};

} // namespace io
} // namespace cudf
2 changes: 1 addition & 1 deletion cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ table_with_metadata read_avro(std::unique_ptr<cudf::io::datasource>&& source,
mr);

for (size_t i = 0; i < column_types.size(); ++i) {
out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr));
out_columns.emplace_back(make_column(out_buffers[i], nullptr, std::nullopt, stream, mr));
}
} else {
// Create empty columns
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
out_columns.emplace_back(
cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr));
} else {
out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr));
out_columns.emplace_back(make_column(out_buffers[i], nullptr, std::nullopt, stream, mr));
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ table_with_metadata convert_data_to_table(parse_options_view const& parse_opts,
for (size_t i = 0; i < num_columns; ++i) {
out_buffers[i].null_count() = num_records - h_valid_counts[i];

auto out_column = make_column(out_buffers[i], nullptr, stream, mr);
auto out_column = make_column(out_buffers[i], nullptr, std::nullopt, stream, mr);
if (out_column->type().id() == type_id::STRING) {
// Need to remove escape character in case of '\"' and '\\'
out_columns.emplace_back(cudf::strings::detail::replace(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ void reader::impl::create_columns(std::vector<std::vector<column_buffer>>&& col_
[&](auto const col_meta) {
schema_info.emplace_back("");
auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0, stream);
return make_column(col_buffer, &schema_info.back(), stream, _mr);
return make_column(col_buffer, &schema_info.back(), std::nullopt, stream, _mr);
});
}

Expand Down
27 changes: 7 additions & 20 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>>&& sources,
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

// Binary columns can be read as binary or strings
_force_binary_columns_as_strings = options.get_convert_binary_to_strings();
_reader_column_schema = options.get_column_schema();

// Select only columns required by the options
std::tie(_input_columns, _output_columns, _output_column_schemas) =
Expand Down Expand Up @@ -1744,28 +1744,15 @@ table_with_metadata reader::impl::read(std::vector<std::vector<size_type>> const
// decoding of column data itself
decode_page_data(chunks, pages, page_nesting_info, num_rows);

auto make_output_column = [&](column_buffer& buf, column_name_info* schema_info, int i) {
auto col = make_column(buf, schema_info, _stream, _mr);
if (should_write_byte_array(i)) {
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
if (schema.converted_type == parquet::UNKNOWN) {
auto const num_rows = col->size();
auto data = col->release();
return make_lists_column(
num_rows,
std::move(data.children[strings_column_view::offsets_column_index]),
std::move(data.children[strings_column_view::chars_column_index]),
UNKNOWN_NULL_COUNT,
std::move(*data.null_mask));
}
}
return col;
};

// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(make_output_column(_output_columns[i], &col_name, i));
auto const metadata =
_reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
out_columns.emplace_back(
make_column(_output_columns[i], &col_name, metadata, _stream, _mr));
}
}
}
Expand Down
16 changes: 1 addition & 15 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,6 @@ class reader::impl {
hostdevice_vector<gpu::PageNestingInfo>& page_nesting,
size_t total_rows);

/**
* @brief Indicates if a column should be written as a byte array
*
* @param col column to check
* @return true if the column should be written as a byte array
* @return false if the column should be written as normal for that type
*/
bool should_write_byte_array(int col)
{
return _output_columns[col].type.id() == type_id::STRING &&
_force_binary_columns_as_strings.has_value() &&
!_force_binary_columns_as_strings.value()[col];
}

private:
rmm::cuda_stream_view _stream;
rmm::mr::device_memory_resource* _mr = nullptr;
Expand All @@ -203,7 +189,7 @@ class reader::impl {
std::vector<int> _output_column_schemas;

bool _strings_to_categorical = false;
std::optional<std::vector<bool>> _force_binary_columns_as_strings;
std::optional<std::vector<reader_column_schema>> _reader_column_schema;
data_type _timestamp_type{type_id::EMPTY};
};

Expand Down
67 changes: 50 additions & 17 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

#include "column_buffer.hpp"
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -58,18 +59,39 @@ void column_buffer::create(size_type _size,
*/
std::unique_ptr<column> make_column(column_buffer& buffer,
column_name_info* schema_info,
std::optional<reader_column_schema> const& schema,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (schema_info != nullptr) { schema_info->name = buffer.name; }

switch (buffer.type.id()) {
case type_id::STRING:
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
if (schema.value_or(reader_column_schema{}).is_enabled_convert_binary_to_strings()) {
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
}

return make_strings_column(*buffer._strings, stream, mr);
} else {
// convert to binary
auto const string_col = make_strings_column(*buffer._strings, stream, mr);
auto const num_rows = string_col->size();
auto col_contest = string_col->release();

if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"binary"});
}

return make_lists_column(
num_rows,
std::move(col_contest.children[strings_column_view::offsets_column_index]),
std::move(col_contest.children[strings_column_view::chars_column_index]),
UNKNOWN_NULL_COUNT,
std::move(*col_contest.null_mask));
}
return make_strings_column(*buffer._strings, stream, mr);

case type_id::LIST: {
// make offsets column
Expand All @@ -83,9 +105,15 @@ std::unique_ptr<column> make_column(column_buffer& buffer,
child_info = &schema_info->children.back();
}

CUDF_EXPECTS(not schema.has_value() or schema->get_num_children() > 0,
"Invalid schema provided for read, expected child data for list!");
auto const child_schema = schema.has_value()
? std::make_optional<reader_column_schema>(schema->child(0))
: std::nullopt;

// make child column
CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer");
auto child = make_column(buffer.children[0], child_info, stream, mr);
auto child = make_column(buffer.children[0], child_info, child_schema, stream, mr);

// make the final list column (note : size is the # of offsets, so our actual # of rows is 1
// less)
Expand All @@ -101,17 +129,22 @@ std::unique_ptr<column> make_column(column_buffer& buffer,
case type_id::STRUCT: {
std::vector<std::unique_ptr<cudf::column>> output_children;
output_children.reserve(buffer.children.size());
std::transform(buffer.children.begin(),
buffer.children.end(),
std::back_inserter(output_children),
[&](column_buffer& col) {
column_name_info* child_info = nullptr;
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{""});
child_info = &schema_info->children.back();
}
return make_column(col, child_info, stream, mr);
});
for (size_t i = 0; i < buffer.children.size(); ++i) {
column_name_info* child_info = nullptr;
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{""});
child_info = &schema_info->children.back();
}

CUDF_EXPECTS(not schema.has_value() or schema->get_num_children() > i,
"Invalid schema provided for read, expected more child data for struct!");
auto const child_schema = schema.has_value()
? std::make_optional<reader_column_schema>(schema->child(i))
: std::nullopt;

output_children.emplace_back(
make_column(buffer.children[i], child_info, child_schema, stream, mr));
}

return make_structs_column(buffer.size,
std::move(output_children),
Expand Down
Loading

0 comments on commit 4178a51

Please sign in to comment.