Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Struct and map support for Parquet reader #6318

Merged
merged 54 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0281d87
[struct] Fix struct filtering.
mythrocks Sep 19, 2020
970e856
Merge remote-tracking branch 'origin/branch-0.16' into fix-struct-filter
mythrocks Sep 20, 2020
6e90840
Initial struct dtype
shwina Sep 21, 2020
cd6b491
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into a…
shwina Sep 22, 2020
336eda5
Merge remote-tracking branch 'origin/branch-0.16' into fix-struct-filter
mythrocks Sep 22, 2020
a1833f2
[struct] Fix struct filtering.
mythrocks Sep 22, 2020
84bf750
[struct] Fix struct filtering.
mythrocks Sep 22, 2020
299df24
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into a…
shwina Sep 23, 2020
d5f8f51
Add a __repr__ for struct dtype
shwina Sep 23, 2020
2ddd892
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into a…
shwina Sep 23, 2020
6cce5e7
Merge remote-tracking branch 'origin/branch-0.16' into fix-struct-filter
mythrocks Sep 23, 2020
7bc0436
Merge remote-tracking branch 'mythrocks/fix-struct-filter' into add-s…
shwina Sep 23, 2020
a4430f7
Initial struct column support
shwina Sep 23, 2020
9238fc0
Post-process to fix struct names
shwina Sep 23, 2020
e63dcf2
Copy struct field names over to libcudf result
shwina Sep 24, 2020
015654d
Fix typo
shwina Sep 24, 2020
a3874ed
Handle all null child in struct
shwina Sep 24, 2020
e6beb03
Mask handling in StructColumn.from_arrow. Add tests
shwina Sep 24, 2020
d4920f8
Struct dtype equality tests
shwina Sep 24, 2020
4ae7fa5
Fields ordering test
shwina Sep 24, 2020
d25fbd0
Struct and map support for Parquet reader.
nvdbaranec Sep 24, 2020
6fda4cc
Changelog
shwina Sep 24, 2020
c51d8cc
Merge branch 'branch-0.16' into parquet_structs
nvdbaranec Sep 24, 2020
641b123
Clang format.
nvdbaranec Sep 24, 2020
4fa1b3e
Changelog for 6318
nvdbaranec Sep 24, 2020
0349dca
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into a…
shwina Sep 30, 2020
27c6da4
Add tests for StructDtype.fields
shwina Sep 30, 2020
159435c
Fixed a bug in the column_buffer move constructor : was not propagati…
nvdbaranec Oct 2, 2020
425a798
Add hierarchy column name information to table_with_metadata struct f…
nvdbaranec Oct 2, 2020
a5e326d
Merge branch 'branch-0.16' into parquet_structs
nvdbaranec Oct 2, 2020
86f6fe3
PR review changes.
nvdbaranec Oct 2, 2020
88bef5e
Merge branch 'add-struct-columns-python' into parquet_structs
shwina Oct 2, 2020
8444c38
Change the code that returns column hierarchy names to include all cu…
nvdbaranec Oct 2, 2020
75e5972
Merge branch 'parquet_structs' of github.com:nvdbaranec/cudf into par…
shwina Oct 5, 2020
5fe6042
Add back ListColumn.to_arrow() for now
shwina Oct 5, 2020
ea8307d
Remove breakpoints
shwina Oct 5, 2020
d810c6d
Simplify categorical handling in to_arrow
shwina Oct 5, 2020
782bb24
Fix up more to_arrow issues
shwina Oct 5, 2020
1b4d3a7
Keep specialization for lists
shwina Oct 5, 2020
0528033
Construct dtype from children in to_arrow()
shwina Oct 5, 2020
df0a112
Test for parquet struct reader.
nvdbaranec Oct 6, 2020
05d120c
Merge branch 'branch-0.16' into parquet_structs
nvdbaranec Oct 6, 2020
f5c7e5a
PR comment changes.
nvdbaranec Oct 6, 2020
738c78d
Merge branch 'branch-0.16' into parquet_structs
nvdbaranec Oct 6, 2020
8f3d377
Address review comments
shwina Oct 6, 2020
972300f
Another round of cpp PR changes. Fixed merge conflicts in python.
nvdbaranec Oct 6, 2020
93f1445
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into p…
shwina Oct 6, 2020
49f3450
Merge branch 'parquet_structs' of github.com:nvdbaranec/cudf into pq_…
shwina Oct 6, 2020
0c9811c
Fix base size of StructColumn
shwina Oct 6, 2020
2db1445
Fixing up logic for generating elements in ListColumn.to_arrow
shwina Oct 6, 2020
98c4013
Undo string compare change in is_list_dtype
shwina Oct 6, 2020
1e5cc7c
Remove duplicates in CHANGELOG. Parameterized struct tests.
nvdbaranec Oct 6, 2020
0b76032
Merge branch 'parquet_structs' of github.com:nvdbaranec/cudf into par…
nvdbaranec Oct 6, 2020
48de1f9
Make sure to assert on expect_eq
nvdbaranec Oct 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
- PR #6125 Add support for `Series.mode` and `DataFrame.mode`
- PR #6271 Add support to deep-copy struct columns from struct column-view
- PR #6262 Add nth_element series aggregation with null handling
- PR #6318 Add support for reading Struct and map types from Parquet files
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
- PR #6316 Add StructColumn to Python API
- PR #6247 Add `minmax` reduction function
- PR #6232 `Json` and `Avro` benchmarking in python
- PR #6139 Add column conversion to big endian byte list.
- PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics
- PR #6254 Add `cudf::make_dictionary_from_scalar` factory function
- PR #6262 Add nth_element series aggregation with null handling
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
- PR #6318 Add support for reading Struct and map types from Parquet files
- PR #6315 Native code for string-map lookups, for cudf-java
- PR #6302 Add custom dataframe accessors
- PR #6301 Add JNI bindings to nvcomp
Expand Down
17 changes: 16 additions & 1 deletion cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ enum statistics_freq {
STATISTICS_PAGE = 2, //!< Per-page column statistics
};

/**
* @brief Detailed name information for output columns.
*
* The hierarchy of children matches the hierarchy of children in the output
* cudf columns.
*/
struct column_name_info {
std::string name;
std::vector<column_name_info> children;
column_name_info(std::string const& _name) : name(_name) {}
column_name_info() = default;
};

/**
* @brief Table metadata for io readers/writers (primarily column names)
* For nested types (structs, maps, unions), the ordering of names in the column_names vector
Expand All @@ -105,7 +118,9 @@ enum statistics_freq {
* f5 f6
*/
struct table_metadata {
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::vector<std::string> column_names; //!< Names of columns contained in the table
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::map<std::string, std::string> user_data; //!< Format-dependent metadata as key-values pairs
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ std::vector<column_buffer> reader::impl::decode_data(std::vector<data_type> cons
is_final_allocation ? mr_ : rmm::mr::get_current_device_resource());

out_buffer.name = col_names_[col];
out_buffers.emplace_back(out_buffer);
out_buffers.emplace_back(std::move(out_buffer));
active_col++;
}
}
Expand Down
239 changes: 146 additions & 93 deletions cpp/src/io/parquet/page_data.cu

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ extern "C" __global__ void __launch_bounds__(128)
PageInfo *page_info;

if (!t) {
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
bs->page.column_idx = bs->ck.dst_col_index;
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
bs->page.src_col_schema = bs->ck.src_col_schema;
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
Expand Down
26 changes: 11 additions & 15 deletions cpp/src/io/parquet/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ PARQUET_END_STRUCT()
*/
bool CompactProtocolReader::InitSchema(FileMetaData *md)
{
if (WalkSchema(md->schema) != md->schema.size()) return false;
if (WalkSchema(md) != md->schema.size()) return false;

/* Inside FileMetaData, there is a std::vector of RowGroups and each RowGroup contains a
* a std::vector of ColumnChunks. Each ColumnChunk has a member ColumnMetaData, which contains
Expand All @@ -313,24 +313,19 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md)
}();
if (it == md->schema.cend()) return false;
current_schema_index = std::distance(md->schema.cbegin(), it);

// if the schema index is already pointing at a nested type, we'll leave it alone.
if (column.schema_idx < 0 ||
md->schema[column.schema_idx].converted_type != parquet::LIST) {
column.schema_idx = current_schema_index;
}
column.leaf_schema_idx = current_schema_index;
parent = current_schema_index;
column.schema_idx = current_schema_index;
parent = current_schema_index;
}
}
}

return true;
}

/**
* @brief Populates each node in the schema tree
*
* @param[out] schema Current node
* @param[out] md File metadata
* @param[in] idx Current node index
* @param[in] parent_idx Parent node index
* @param[in] max_def_level Max definition level
Expand All @@ -339,10 +334,10 @@ bool CompactProtocolReader::InitSchema(FileMetaData *md)
* @return The node index that was populated
*/
int CompactProtocolReader::WalkSchema(
std::vector<SchemaElement> &schema, int idx, int parent_idx, int max_def_level, int max_rep_level)
FileMetaData *md, int idx, int parent_idx, int max_def_level, int max_rep_level)
{
if (idx >= 0 && (size_t)idx < schema.size()) {
SchemaElement *e = &schema[idx];
if (idx >= 0 && (size_t)idx < md->schema.size()) {
SchemaElement *e = &md->schema[idx];
if (e->repetition_type == OPTIONAL) {
++max_def_level;
} else if (e->repetition_type == REPEATED) {
Expand All @@ -352,12 +347,13 @@ int CompactProtocolReader::WalkSchema(
e->max_definition_level = max_def_level;
e->max_repetition_level = max_rep_level;
e->parent_idx = parent_idx;
parent_idx = idx;

parent_idx = idx;
++idx;
if (e->num_children > 0) {
for (int i = 0; i < e->num_children; i++) {
int idx_old = idx;
idx = WalkSchema(schema, idx, parent_idx, max_def_level, max_rep_level);
idx = WalkSchema(md, idx, parent_idx, max_def_level, max_rep_level);
if (idx <= idx_old) break; // Error
}
}
Expand Down
40 changes: 34 additions & 6 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct file_ender_s {
*
* Parquet is a strongly-typed format so the file layout can be interpreted as
* as a schema tree.
**/
*/
struct SchemaElement {
Type type = UNDEFINED_TYPE;
ConvertedType converted_type = UNKNOWN;
Expand All @@ -73,6 +73,38 @@ struct SchemaElement {
name == other.name && num_children == other.num_children &&
decimal_scale == other.decimal_scale && decimal_precision == other.decimal_precision;
}

// the parquet format is a little squishy when it comes to interpreting
// repeated fields. sometimes repeated fields act as "stubs" in the schema
// that don't represent a true nesting level.
//
// this is the case with plain lists:
//
// optional group my_list (LIST) {
// repeated group element { <-- not part of the output hierarchy
// required binary str (UTF8);
// };
// }
//
// However, for backwards compatibility reasons, there are a few special cases, namely
// List<Struct<>> (which also corresponds to how the map type is specified), where
// this does not hold true
//
// optional group my_list (LIST) {
// repeated group element { <-- part of the hierarchy because it represents a struct
// required binary str (UTF8);
// required int32 num;
// };
// }
bool is_stub() const { return repetition_type == REPEATED && num_children == 1; }
// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
bool is_struct() const
{
return type == UNDEFINED_TYPE &&
// this assumption might be a little weak.
((repetition_type != REPEATED) || (repetition_type == REPEATED && num_children == 2));
}
};

/**
Expand Down Expand Up @@ -114,10 +146,6 @@ struct ColumnChunk {

// Following fields are derived from other fields
int schema_idx = -1; // Index in flattened schema (derived from path_in_schema)
// if this is a non-nested type, this index will be the same as schema_idx.
// for a nested type, this will point to the fundamental leaf type schema
// element (int, string, etc)
int leaf_schema_idx = -1;
};

/**
Expand Down Expand Up @@ -308,7 +336,7 @@ class CompactProtocolReader {
bool InitSchema(FileMetaData *md);

protected:
int WalkSchema(std::vector<SchemaElement> &schema,
int WalkSchema(FileMetaData *md,
int idx = 0,
int parent_idx = 0,
int max_def_level = 0,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <cstdint>

#include <string>
#include <vector>

namespace cudf {
namespace io {
namespace parquet {
Expand Down
67 changes: 44 additions & 23 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,28 @@
#include <cuda_runtime.h>
#include <io/comp/gpuinflate.h>
#include <io/statistics/column_stats.h>
#include <cudf/types.hpp>
#include <io/parquet/parquet_common.hpp>
#include <io/utilities/column_buffer.hpp>
#include <io/utilities/hostdevice_vector.hpp>
#include <vector>

namespace cudf {
namespace io {
namespace parquet {

/**
* @brief Struct representing an input column in the file.
*/
struct input_column_info {
int schema_idx;
std::string name;
// size == nesting depth. the associated real output
// buffer index in the dest column for each level of nesting.
std::vector<int> nesting;
auto nesting_depth() const { return nesting.size(); }
};

namespace gpu {

/**
Expand Down Expand Up @@ -57,15 +72,14 @@ struct nvstrdesc_s {
* @brief Nesting information
*/
struct PageNestingInfo {
// input definition levels are remapped with this into
// the corresponding output PageNestingInfo struct
// within the same PageInfo.
// eg.
// PageNestingInfo *out = &page.nesting[page.nesting[d].d_remap];
int32_t d_remap;
// input repetition/definition levels are remapped with these values
// into the corresponding real output nesting depths.
int32_t start_depth;
int32_t end_depth;

// set at initialization
int32_t max_def_level;
int32_t max_rep_level;

// set during preprocessing
int32_t size; // this page/nesting-level's size contribution to the output column
Expand All @@ -76,7 +90,7 @@ struct PageNestingInfo {
int32_t value_count; // total # of values decoded in this page/nesting-level
int32_t valid_map_offset; // current offset in bits relative to valid_map
uint8_t *data_out; // pointer into output buffer
uint32_t *valid_map; // pointer into output validity buferr
uint32_t *valid_map; // pointer into output validity buffer
};

/**
Expand All @@ -95,12 +109,12 @@ struct PageInfo {
// - In the case of a nested schema, you have to decode the repetition and definition
// levels to extract actual column values
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_idx; // column chunk this page belongs to
int32_t column_idx; // column index this page belongs to
uint8_t flags; // PAGEINFO_FLAGS_XXX
uint8_t encoding; // Encoding for data or dictionary page
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_idx; // column chunk this page belongs to
int32_t src_col_schema; // schema index of this column
uint8_t flags; // PAGEINFO_FLAGS_XXX
uint8_t encoding; // Encoding for data or dictionary page
uint8_t definition_level_encoding; // Encoding used for definition levels (data page)
uint8_t repetition_level_encoding; // Encoding used for repetition levels (data page)

Expand All @@ -126,20 +140,22 @@ struct ColumnChunkDesc {
uint32_t num_rows_,
int16_t max_definition_level_,
int16_t max_repetition_level_,
int16_t max_nesting_depth_,
uint8_t def_level_bits_,
uint8_t rep_level_bits_,
int8_t codec_,
int8_t converted_type_,
int8_t decimal_scale_,
int32_t ts_clock_rate_,
int32_t dst_col_index_,
int32_t src_col_index_)
int32_t src_col_index_,
int32_t src_col_schema_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
start_row(start_row_),
num_rows(num_rows_),
max_level{max_definition_level_, max_repetition_level_},
max_nesting_depth{max_nesting_depth_},
data_type(datatype_ | (datatype_length_ << 3)),
level_bits{def_level_bits_, rep_level_bits_},
num_data_pages(0),
Expand All @@ -153,8 +169,8 @@ struct ColumnChunkDesc {
converted_type(converted_type_),
decimal_scale(decimal_scale_),
ts_clock_rate(ts_clock_rate_),
dst_col_index(dst_col_index_),
src_col_index(src_col_index_)
src_col_index(src_col_index_),
src_col_schema(src_col_schema_)
{
}

Expand All @@ -164,6 +180,7 @@ struct ColumnChunkDesc {
size_t start_row; // starting row of this chunk
uint32_t num_rows; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]; // max definition/repetition level
int16_t max_nesting_depth; // max nesting depth of the output
uint16_t data_type; // basic column data type, ((type_length << 3) |
// parquet::Type)
uint8_t
Expand All @@ -181,8 +198,8 @@ struct ColumnChunkDesc {
int8_t decimal_scale; // decimal scale pow(10, -decimal_scale)
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)

int32_t dst_col_index; // my output column index
int32_t src_col_index; // my source (order in the file) column index
int32_t src_col_index; // my input column index
int32_t src_col_schema; // my schema index in the file
};

/**
Expand Down Expand Up @@ -315,12 +332,14 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks,
* the parquet headers when dealing with nested schemas.
* - The total sizes of all output columns at all nesting levels
* - The starting output buffer offset for each page, for each nesting level
*
* For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders)
*
* Note : this function is where output device memory is allocated for nested columns.
*
* @param[in,out] pages All pages to be decoded
* @param[in] chunks All chunks to be decoded
* @param[in,out] nested_info Per-output column nesting information (size, nullability)
* @param[in,out] input_columns Input column information
* @param[in,out] output_columns Output column information
* @param[in] num_rows Maximum number of rows to read
* @param[in] min_rows crop all rows below min_row
* @param[in] stream Cuda stream
Expand All @@ -329,10 +348,12 @@ cudaError_t BuildStringDictionaryIndex(ColumnChunkDesc *chunks,
*/
cudaError_t PreprocessColumnData(hostdevice_vector<PageInfo> &pages,
hostdevice_vector<ColumnChunkDesc> const &chunks,
std::vector<std::vector<std::pair<int, bool>>> &nested_sizes,
std::vector<input_column_info> &input_columns,
std::vector<cudf::io::detail::column_buffer> &output_columns,
size_t num_rows,
size_t min_row,
cudaStream_t stream = (cudaStream_t)0);
cudaStream_t stream,
rmm::mr::device_memory_resource *mr);

/**
* @brief Launches kernel for reading the column data stored in the pages
Expand Down
Loading