Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind read_parquet_metadata API to libcudf instead of pyarrow and extract RowGroup information #15398

Merged
merged 16 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions cpp/include/cudf/io/parquet_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,13 @@ enum class TypeKind : int8_t {
*/
struct parquet_column_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_column_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -134,6 +141,13 @@ struct parquet_column_schema {
*/
struct parquet_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -165,6 +179,15 @@ class parquet_metadata {
public:
/// Key-value metadata in the file footer.
using key_value_metadata = std::unordered_map<std::string, std::string>;
/// row group metadata from each RowGroup element.
using row_group_metadata = std::unordered_map<std::string, int64_t>;

/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_metadata() = default;

/**
* @brief constructor
Expand All @@ -173,15 +196,18 @@ class parquet_metadata {
* @param num_rows number of rows
* @param num_rowgroups number of row groups
* @param file_metadata key-value metadata in the file footer
* @param rg_metadata vector of maps containing metadata for each row group
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*/
parquet_metadata(parquet_schema schema,
int64_t num_rows,
size_type num_rowgroups,
key_value_metadata file_metadata)
key_value_metadata file_metadata,
std::vector<row_group_metadata> rg_metadata)
: _schema{std::move(schema)},
_num_rows{num_rows},
_num_rowgroups{num_rowgroups},
_file_metadata{std::move(file_metadata)}
_file_metadata{std::move(file_metadata)},
_rowgroup_metadata{rg_metadata}
{
}

Expand All @@ -207,18 +233,27 @@ class parquet_metadata {
* @return Number of row groups
*/
[[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; }

/**
* @brief Returns the Key value metadata in the file footer.
*
* @return Key value metadata as a map
*/
[[nodiscard]] auto const& metadata() const { return _file_metadata; }

/**
* @brief Returns the row group metadata in the file footer.
*
* @return vector of row group metadata as maps
*/
[[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; }

private:
parquet_schema _schema;
int64_t _num_rows;
size_type _num_rowgroups;
key_value_metadata _file_metadata;
std::vector<row_group_metadata> _rowgroup_metadata;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> con
return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)},
metadata.get_num_rows(),
metadata.get_num_row_groups(),
metadata.get_key_value_metadata()[0]};
metadata.get_key_value_metadata()[0],
std::move(metadata.get_rowgroup_metadata())};
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace cudf::io::parquet::detail
20 changes: 20 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,26 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t
return col->meta_data;
}

std::vector<std::unordered_map<std::string, int64_t>>
aggregate_reader_metadata::get_rowgroup_metadata() const
{
std::vector<std::unordered_map<std::string, int64_t>> rg_metadata;

std::for_each(
per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) {
std::transform(pfm.row_groups.cbegin(),
pfm.row_groups.cend(),
std::back_inserter(rg_metadata),
[](auto const& rg) {
std::unordered_map<std::string, int64_t> rg_meta_map;
rg_meta_map["num_rows"] = rg.num_rows;
rg_meta_map["total_byte_size"] = rg.total_byte_size;
return rg_meta_map;
});
});
return rg_metadata;
}

std::string aggregate_reader_metadata::get_pandas_index() const
{
// Assumes that all input files have the same metadata
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ class aggregate_reader_metadata {
size_type src_idx,
int schema_idx) const;

/**
* @brief Extracts high-level metadata for all row groups
*
* @return List of maps containing metadata information for each row group
*/
[[nodiscard]] std::vector<std::unordered_map<std::string, int64_t>> get_rowgroup_metadata() const;

[[nodiscard]] auto get_num_rows() const { return num_rows; }

[[nodiscard]] auto get_num_row_groups() const { return num_row_groups; }
Expand All @@ -178,6 +185,7 @@ class aggregate_reader_metadata {
[[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; }

[[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); }

/**
* @brief Gets the concrete nesting depth of output cudf columns
*
Expand Down
32 changes: 32 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport int64_t
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector

cimport cudf._lib.cpp.io.types as cudf_io_types
from cudf._lib.cpp.types cimport size_type


cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil:
cdef cppclass parquet_column_schema:
parquet_column_schema() except+
string name() except+
size_type num_children() except+
parquet_column_schema child(int idx) except+
vector[parquet_column_schema] children() except+

cdef cppclass parquet_schema:
parquet_schema() except+
parquet_column_schema root() except+

cdef cppclass parquet_metadata:
parquet_metadata() except+
parquet_schema schema() except+
int64_t num_rows() except+
size_type num_rowgroups() except+
unordered_map[string, string] metadata() except+
vector[unordered_map[string, int64_t]] rowgroup_metadata() except+

cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+
74 changes: 74 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ from cudf._lib.cpp.io.parquet cimport (
read_parquet as parquet_reader,
write_parquet as parquet_writer,
)
from cudf._lib.cpp.io.parquet_metadata cimport (
parquet_metadata,
read_parquet_metadata as parquet_metadata_reader,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
)
from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type
Expand Down Expand Up @@ -316,6 +320,76 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
df._data.label_dtype = cudf.dtype(column_index_type)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
"""
Cython function to call into libcudf API, see `read_parquet_metadata`.

filters, if not None, should be an Expression that evaluates to a
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
boolean predicate as a function of columns being read.

See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""
# Convert NativeFile buffers to NativeFileDatasource,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers)

args = move(source)

cdef parquet_metadata c_result

# Read Parquet metadata
with nogil:
c_result = move(parquet_metadata_reader(args))
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

# access and return results
num_rows = c_result.num_rows()
num_rowgroups = c_result.num_rowgroups()

# extract row group metadata and sanitize keys
row_group_metadata=[]
row_group_metadata_unsanitized = c_result.rowgroup_metadata()
for metadata in row_group_metadata_unsanitized:
row_group_metadata.append({k.decode(): v for k, v in metadata})
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

# read all column names including index column, if any
col_names = [info.name().decode() for info in c_result.schema().root().children()]

# access the Parquet file_footer to find the index
index_col = None
cdef unordered_map[string, string] file_footer = c_result.metadata()

# get index column name(s)
index_col_names = None
json_str = file_footer[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
file_is_range_index, index_col, _ = _parse_metadata(meta)
if not file_is_range_index and index_col is not None \
and index_col_names is None:
index_col_names = {}
for idx_col in index_col:
for c in meta['columns']:
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']

# remove the index column from the list of column names
# only if index_col_names is not None
if index_col_names is not None:
col_names = [name for name in col_names if name not in index_col_names]

# num_columns = length of list(col_names)
num_columns = len(col_names)

# return the metadata
return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata


@acquire_spill_lock()
def write_parquet(
Expand Down
61 changes: 54 additions & 7 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,64 @@ def write_to_dataset(

@ioutils.doc_read_parquet_metadata()
@_cudf_nvtx_annotate
def read_parquet_metadata(path):
def read_parquet_metadata(filepath_or_buffer):
"""{docstring}"""
import pyarrow.parquet as pq
# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
filepath_or_buffer = [filepath_or_buffer]

pq_file = pq.ParquetFile(path)
# Start by trying construct a filesystem object, so we
# can apply filters on remote file-systems
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=None
)

num_rows = pq_file.metadata.num_rows
num_row_groups = pq_file.num_row_groups
col_names = pq_file.schema.names
# Use pyarrow dataset to detect/process directory-partitioned
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
# data and apply filters. Note that we can only support partitioned
# data and filtering if the input is a single directory or list of
# paths.
partition_keys = []
partition_categories = {}
if fs and paths:
(
paths,
row_groups,
partition_keys,
partition_categories,
) = _process_dataset(
paths=paths,
fs=fs,
filters=None,
row_groups=None,
categorical_partitions=True,
dataset_kwargs=None,
)
filepath_or_buffer = paths if paths else filepath_or_buffer

filepaths_or_buffers = []

for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)

return num_rows, num_row_groups, col_names
return libparquet.read_parquet_metadata(filepaths_or_buffers)


@_cudf_nvtx_annotate
Expand Down
Loading
Loading