Skip to content

Commit

Permalink
Struct and map support for Parquet reader (#6318)
Browse files Browse the repository at this point in the history
* [struct] Fix struct filtering.

Filtering a table that contains struct-columns fails
because struct columns cannot yet be deep-copied from
a column-view.
This commit fixes the problem.

* Initial struct dtype

* [struct] Fix struct filtering.

Added Struct<List> test.
Removed errant prints, extra whitespace.

* [struct] Fix struct filtering.

Added tests for cloning Struct<List> and List<Struct<List>> columns.
Code formatting has been fixed, also.

* Add a __repr__ for struct dtype

* Initial struct column support

* Post-process to fix struct names

* Copy struct field names over to libcudf result

* Fix typo

* Handle all null child in struct

* Mask handling in StructColumn.from_arrow. Add tests

* Struct dtype equality tests

* Fields ordering test

* Struct and map support for Parquet reader.

* Changelog

* Clang format.

* Changelog for 6318

* Add tests for StructDtype.fields

* Fixed a bug in the column_buffer move constructor : was not propagating is_nullable flag.

* Add hierarchy column name information to table_with_metadata struct for the parquet reader.

* PR review changes.

* Change the code that returns column hierarchy names to include all cudf columns in the hierarchy, including things like list or string offsets - basically things that aren't actually
part of the schema.  Python needs this info to intelligently traverse the hierarchy and map columns to specific field names in structs.

* Add back ListColumn.to_arrow() for now

* Remove breakpoints

* Simplify categorical handling in to_arrow

* Fix up more to_arrow issues

* Keep specialization for lists

* Construct dtype from children in to_arrow()

* Test for parquet struct reader.

* PR comment changes.

* Address review comments

* Another round of cpp PR changes.  Fixed merge conflicts in python.

* Fix base size of StructColumn

* Fixing up logic for generating elements in ListColumn.to_arrow

* Undo string compare change in is_list_dtype

* Remove duplicates in CHANGELOG.  Parameterized struct tests.

* Make sure to assert on expect_eq

Co-authored-by: Mithun RK <[email protected]>
Co-authored-by: Ashwin Srinath <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2020
1 parent 305e424 commit 84557ea
Show file tree
Hide file tree
Showing 22 changed files with 1,148 additions and 564 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- PR #6139 Add column conversion to big endian byte list.
- PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics
- PR #6254 Add `cudf::make_dictionary_from_scalar` factory function
- PR #6318 Add support for reading Struct and map types from Parquet files
- PR #6315 Native code for string-map lookups, for cudf-java
- PR #6302 Add custom dataframe accessors
- PR #6301 Add JNI bindings to nvcomp
Expand Down
17 changes: 16 additions & 1 deletion cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ enum statistics_freq {
STATISTICS_PAGE = 2, //!< Per-page column statistics
};

/**
* @brief Detailed name information for output columns.
*
* The hierarchy of children matches the hierarchy of children in the output
* cudf columns.
*/
struct column_name_info {
std::string name;
std::vector<column_name_info> children;
column_name_info(std::string const& _name) : name(_name) {}
column_name_info() = default;
};

/**
* @brief Table metadata for io readers/writers (primarily column names)
* For nested types (structs, maps, unions), the ordering of names in the column_names vector
Expand All @@ -105,7 +118,9 @@ enum statistics_freq {
* f5 f6
*/
struct table_metadata {
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::map<std::string, std::string> user_data; //!< Format-dependent metadata as key-values pairs
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ std::vector<column_buffer> reader::impl::decode_data(std::vector<data_type> cons
is_final_allocation ? mr_ : rmm::mr::get_current_device_resource());

out_buffer.name = col_names_[col];
out_buffers.emplace_back(out_buffer);
out_buffers.emplace_back(std::move(out_buffer));
active_col++;
}
}
Expand Down
239 changes: 146 additions & 93 deletions cpp/src/io/parquet/page_data.cu

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ extern "C" __global__ void __launch_bounds__(128)
PageInfo *page_info;

if (!t) {
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
bs->page.column_idx = bs->ck.dst_col_index;
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
bs->page.src_col_schema = bs->ck.src_col_schema;
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
Expand Down
26 changes: 11 additions & 15 deletions cpp/src/io/parquet/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ PARQUET_END_STRUCT()
*/
bool CompactProtocolReader::InitSchema(FileMetaData *md)
{
if (WalkSchema(md->schema) != md->schema.size()) return false;
if (WalkSchema(md) != md->schema.size()) return false;

/* Inside FileMetaData, there is a std::vector of RowGroups and each RowGroup contains a
* a std::vector of ColumnChunks. Each ColumnChunk has a member ColumnMetaData, which contains
Expand All @@ -313,24 +313,19 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md)
}();
if (it == md->schema.cend()) return false;
current_schema_index = std::distance(md->schema.cbegin(), it);

// if the schema index is already pointing at a nested type, we'll leave it alone.
if (column.schema_idx < 0 ||
md->schema[column.schema_idx].converted_type != parquet::LIST) {
column.schema_idx = current_schema_index;
}
column.leaf_schema_idx = current_schema_index;
parent = current_schema_index;
column.schema_idx = current_schema_index;
parent = current_schema_index;
}
}
}

return true;
}

/**
* @brief Populates each node in the schema tree
*
* @param[out] schema Current node
* @param[out] md File metadata
* @param[in] idx Current node index
* @param[in] parent_idx Parent node index
* @param[in] max_def_level Max definition level
Expand All @@ -339,10 +334,10 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md)
* @return The node index that was populated
*/
int CompactProtocolReader::WalkSchema(
std::vector<SchemaElement> &schema, int idx, int parent_idx, int max_def_level, int max_rep_level)
FileMetaData *md, int idx, int parent_idx, int max_def_level, int max_rep_level)
{
if (idx >= 0 && (size_t)idx < schema.size()) {
SchemaElement *e = &schema[idx];
if (idx >= 0 && (size_t)idx < md->schema.size()) {
SchemaElement *e = &md->schema[idx];
if (e->repetition_type == OPTIONAL) {
++max_def_level;
} else if (e->repetition_type == REPEATED) {
Expand All @@ -352,12 +347,13 @@ int CompactProtocolReader::WalkSchema(
e->max_definition_level = max_def_level;
e->max_repetition_level = max_rep_level;
e->parent_idx = parent_idx;
parent_idx = idx;

parent_idx = idx;
++idx;
if (e->num_children > 0) {
for (int i = 0; i < e->num_children; i++) {
int idx_old = idx;
idx = WalkSchema(schema, idx, parent_idx, max_def_level, max_rep_level);
idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level);
if (idx <= idx_old) break; // Error
}
}
Expand Down
40 changes: 34 additions & 6 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct file_ender_s {
*
* Parquet is a strongly-typed format so the file layout can be interpreted as
* as a schema tree.
**/
*/
struct SchemaElement {
Type type = UNDEFINED_TYPE;
ConvertedType converted_type = UNKNOWN;
Expand All @@ -73,6 +73,38 @@ struct SchemaElement {
name == other.name && num_children == other.num_children &&
decimal_scale == other.decimal_scale && decimal_precision == other.decimal_precision;
}

// the parquet format is a little squishy when it comes to interpreting
// repeated fields. sometimes repeated fields act as "stubs" in the schema
// that don't represent a true nesting level.
//
// this is the case with plain lists:
//
// optional group my_list (LIST) {
// repeated group element { <-- not part of the output hierarchy
// required binary str (UTF8);
// };
// }
//
// However, for backwards compatibility reasons, there are a few special cases, namely
// List<Struct<>> (which also corresponds to how the map type is specified), where
// this does not hold true
//
// optional group my_list (LIST) {
// repeated group element { <-- part of the hierarchy because it represents a struct
// required binary str (UTF8);
// required int32 num;
// };
// }
bool is_stub() const { return repetition_type == REPEATED && num_children == 1; }
// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
bool is_struct() const
{
return type == UNDEFINED_TYPE &&
// this assumption might be a little weak.
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2));
}
};

/**
Expand Down Expand Up @@ -114,10 +146,6 @@ struct ColumnChunk {

// Following fields are derived from other fields
int schema_idx = -1; // Index in flattened schema (derived from path_in_schema)
// if this is a non-nested type, this index will be the same as schema_idx.
// for a nested type, this will point to the fundamental leaf type schema
// element (int, string, etc)
int leaf_schema_idx = -1;
};

/**
Expand Down Expand Up @@ -308,7 +336,7 @@ class CompactProtocolReader {
bool InitSchema(FileMetaData *md);

protected:
int WalkSchema(std::vector<SchemaElement> &schema,
int WalkSchema(FileMetaData *md,
int idx = 0,
int parent_idx = 0,
int max_def_level = 0,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <cstdint>

#include <string>
#include <vector>

namespace cudf {
namespace io {
namespace parquet {
Expand Down
67 changes: 44 additions & 23 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,28 @@
#include <cuda_runtime.h>
#include <io/comp/gpuinflate.h>
#include <io/statistics/column_stats.h>
#include <cudf/types.hpp>
#include <io/parquet/parquet_common.hpp>
#include <io/utilities/column_buffer.hpp>
#include <io/utilities/hostdevice_vector.hpp>
#include <vector>

namespace cudf {
namespace io {
namespace parquet {

/**
* @brief Struct representing an input column in the file.
*/
struct input_column_info {
int schema_idx;
std::string name;
// size == nesting depth. the associated real output
// buffer index in the dest column for each level of nesting.
std::vector<int> nesting;
auto nesting_depth() const { return nesting.size(); }
};

namespace gpu {

/**
Expand Down Expand Up @@ -57,15 +72,14 @@ struct nvstrdesc_s {
* @brief Nesting information
*/
struct PageNestingInfo {
// input definition levels are remapped with this into
// the corresponding output PageNestingInfo struct
// within the same PageInfo.
// eg.
// PageNestingInfo *out = &page.nesting[page.nesting[d].d_remap];
int32_t d_remap;
// input repetition/definition levels are remapped with these values
// into the corresponding real output nesting depths.
int32_t start_depth;
int32_t end_depth;

// set at initialization
int32_t max_def_level;
int32_t max_rep_level;

// set during preprocessing
int32_t size; // this page/nesting-level's size contribution to the output column
Expand All @@ -76,7 +90,7 @@ struct PageNestingInfo {
int32_t value_count; // total # of values decoded in this page/nesting-level
int32_t valid_map_offset; // current offset in bits relative to valid_map
uint8_t *data_out; // pointer into output buffer
uint32_t *valid_map; // pointer into output validity buferr
uint32_t *valid_map; // pointer into output validity buffer
};

/**
Expand All @@ -95,12 +109,12 @@ struct PageInfo {
// - In the case of a nested schema, you have to decode the repetition and definition
// levels to extract actual column values
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_idx; // column chunk this page belongs to
int32_t column_idx; // column index this page belongs to
uint8_t flags; // PAGEINFO_FLAGS_XXX
uint8_t encoding; // Encoding for data or dictionary page
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_idx; // column chunk this page belongs to
int32_t src_col_schema; // schema index of this column
uint8_t flags; // PAGEINFO_FLAGS_XXX
uint8_t encoding; // Encoding for data or dictionary page
uint8_t definition_level_encoding; // Encoding used for definition levels (data page)
uint8_t repetition_level_encoding; // Encoding used for repetition levels (data page)

Expand All @@ -126,20 +140,22 @@ struct ColumnChunkDesc {
uint32_t num_rows_,
int16_t max_definition_level_,
int16_t max_repetition_level_,
int16_t max_nesting_depth_,
uint8_t def_level_bits_,
uint8_t rep_level_bits_,
int8_t codec_,
int8_t converted_type_,
int8_t decimal_scale_,
int32_t ts_clock_rate_,
int32_t dst_col_index_,
int32_t src_col_index_)
int32_t src_col_index_,
int32_t src_col_schema_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
start_row(start_row_),
num_rows(num_rows_),
max_level{max_definition_level_, max_repetition_level_},
max_nesting_depth{max_nesting_depth_},
data_type(datatype_ | (datatype_length_ << 3)),
level_bits{def_level_bits_, rep_level_bits_},
num_data_pages(0),
Expand All @@ -153,8 +169,8 @@ struct ColumnChunkDesc {
converted_type(converted_type_),
decimal_scale(decimal_scale_),
ts_clock_rate(ts_clock_rate_),
dst_col_index(dst_col_index_),
src_col_index(src_col_index_)
src_col_index(src_col_index_),
src_col_schema(src_col_schema_)
{
}

Expand All @@ -164,6 +180,7 @@ struct ColumnChunkDesc {
size_t start_row; // starting row of this chunk
uint32_t num_rows; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]; // max definition/repetition level
int16_t max_nesting_depth; // max nesting depth of the output
uint16_t data_type; // basic column data type, ((type_length << 3) |
// parquet::Type)
uint8_t
Expand All @@ -181,8 +198,8 @@ struct ColumnChunkDesc {
int8_t decimal_scale; // decimal scale pow(10, -decimal_scale)
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)

int32_t dst_col_index; // my output column index
int32_t src_col_index; // my source (order in the file) column index
int32_t src_col_index; // my input column index
int32_t src_col_schema; // my schema index in the file
};

/**
Expand Down Expand Up @@ -315,12 +332,14 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks,
* the parquet headers when dealing with nested schemas.
* - The total sizes of all output columns at all nesting levels
* - The starting output buffer offset for each page, for each nesting level
*
* For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders)
*
* Note : this function is where output device memory is allocated for nested columns.
*
* @param[in,out] pages All pages to be decoded
* @param[in] chunks All chunks to be decoded
* @param[in,out] nested_info Per-output column nesting information (size, nullability)
* @param[in,out] input_columns Input column information
* @param[in,out] output_columns Output column information
* @param[in] num_rows Maximum number of rows to read
* @param[in] min_rows crop all rows below min_row
* @param[in] stream Cuda stream
Expand All @@ -329,10 +348,12 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks,
*/
cudaError_t PreprocessColumnData(hostdevice_vector<PageInfo> &pages,
hostdevice_vector<ColumnChunkDesc> const &chunks,
std::vector<std::vector<std::pair<int, bool>>> &nested_sizes,
std::vector<input_column_info> &input_columns,
std::vector<cudf::io::detail::column_buffer> &output_columns,
size_t num_rows,
size_t min_row,
cudaStream_t stream = (cudaStream_t)0);
cudaStream_t stream,
rmm::mr::device_memory_resource *mr);

/**
* @brief Launches kernel for reading the column data stored in the pages
Expand Down
Loading

0 comments on commit 84557ea

Please sign in to comment.