Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind read_parquet_metadata API to libcudf instead of pyarrow and extract RowGroup information #15398

Merged
merged 16 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions cpp/include/cudf/io/parquet_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,13 @@ enum class TypeKind : int8_t {
*/
struct parquet_column_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_column_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -134,6 +141,13 @@ struct parquet_column_schema {
*/
struct parquet_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -165,23 +179,38 @@ class parquet_metadata {
public:
/// Key-value metadata in the file footer.
using key_value_metadata = std::unordered_map<std::string, std::string>;
/// row group metadata from each RowGroup element.
using row_group_metadata = std::unordered_map<std::string, int64_t>;

/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_metadata() = default;

/**
* @brief constructor
*
* @param schema parquet schema
* @param num_rows number of rows
* @param num_rowgroups number of row groups
* @param num_columns number of columns
* @param file_metadata key-value metadata in the file footer
* @param rg_metadata vector of maps containing metadata for each row group
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
*/
parquet_metadata(parquet_schema schema,
int64_t num_rows,
size_type num_rowgroups,
key_value_metadata file_metadata)
size_type num_columns,
key_value_metadata file_metadata,
std::vector<row_group_metadata> rg_metadata)
: _schema{std::move(schema)},
_num_rows{num_rows},
_num_rowgroups{num_rowgroups},
_file_metadata{std::move(file_metadata)}
_num_columns{num_columns},
_file_metadata{std::move(file_metadata)},
_rowgroup_metadata{std::move(rg_metadata)}
{
}

Expand All @@ -207,18 +236,35 @@ class parquet_metadata {
* @return Number of row groups
*/
[[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; }

/**
* @brief Returns the number of columns in the file.
*
* @return Number of row groups
*/
[[nodiscard]] auto num_columns() const { return _num_columns; }

/**
* @brief Returns the Key value metadata in the file footer.
*
* @return Key value metadata as a map
*/
[[nodiscard]] auto const& metadata() const { return _file_metadata; }

/**
* @brief Returns the row group metadata in the file footer.
*
* @return vector of row group metadata as maps
*/
[[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; }

private:
parquet_schema _schema;
int64_t _num_rows;
size_type _num_rowgroups;
size_type _num_columns;
key_value_metadata _file_metadata;
std::vector<row_group_metadata> _rowgroup_metadata;
};

/**
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,9 @@ parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> con
return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)},
metadata.get_num_rows(),
metadata.get_num_row_groups(),
metadata.get_key_value_metadata()[0]};
metadata.get_num_columns(),
metadata.get_key_value_metadata()[0],
metadata.get_rowgroup_metadata()};
}

} // namespace cudf::io::parquet::detail
30 changes: 29 additions & 1 deletion cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,13 @@ size_type aggregate_reader_metadata::calc_num_row_groups() const
});
}

size_type aggregate_reader_metadata::calc_num_columns() const
{
return per_file_metadata.size()
? (per_file_metadata[0].schema.size() ? per_file_metadata[0].schema[0].num_children : 0)
: 0;
}

// Copies info from the column and offset indexes into the passed in row_group_info.
void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_info,
size_type chunk_start_row) const
Expand Down Expand Up @@ -520,7 +527,8 @@ aggregate_reader_metadata::aggregate_reader_metadata(
: per_file_metadata(metadatas_from_sources(sources)),
keyval_maps(collect_keyval_metadata()),
num_rows(calc_num_rows()),
num_row_groups(calc_num_row_groups())
num_row_groups(calc_num_row_groups()),
num_columns(calc_num_columns())
{
if (per_file_metadata.size() > 0) {
auto const& first_meta = per_file_metadata.front();
Expand Down Expand Up @@ -560,6 +568,26 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t
return col->meta_data;
}

std::vector<std::unordered_map<std::string, int64_t>>
aggregate_reader_metadata::get_rowgroup_metadata() const
{
std::vector<std::unordered_map<std::string, int64_t>> rg_metadata;

std::for_each(
per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) {
std::transform(pfm.row_groups.cbegin(),
pfm.row_groups.cend(),
std::back_inserter(rg_metadata),
[&rg_metadata](auto const& rg) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<std::string, int64_t> rg_meta_map;
rg_meta_map["num_rows"] = rg.num_rows;
rg_meta_map["total_byte_size"] = rg.total_byte_size;
return rg_meta_map;
});
});
return rg_metadata;
}

std::string aggregate_reader_metadata::get_pandas_index() const
{
// Assumes that all input files have the same metadata
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class aggregate_reader_metadata {
std::vector<std::unordered_map<std::string, std::string>> keyval_maps;
int64_t num_rows;
size_type num_row_groups;
size_type num_columns;

/**
* @brief Create a metadata object from each element in the source vector
Expand All @@ -149,6 +150,8 @@ class aggregate_reader_metadata {
*/
[[nodiscard]] size_type calc_num_row_groups() const;

[[nodiscard]] size_type calc_num_columns() const;

/**
* @brief Calculate column index info for the given `row_group_info`
*
Expand All @@ -166,10 +169,19 @@ class aggregate_reader_metadata {
size_type src_idx,
int schema_idx) const;

/**
* @brief Extracts high-level metadata for all row groups
*
* @return List of maps containing metadata information for each row group
*/
[[nodiscard]] std::vector<std::unordered_map<std::string, int64_t>> get_rowgroup_metadata() const;

[[nodiscard]] auto get_num_rows() const { return num_rows; }

[[nodiscard]] auto get_num_row_groups() const { return num_row_groups; }

[[nodiscard]] auto get_num_columns() const { return num_columns; }

[[nodiscard]] auto const& get_schema(int schema_idx) const
{
return per_file_metadata[0].schema[schema_idx];
Expand All @@ -178,6 +190,7 @@ class aggregate_reader_metadata {
[[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; }

[[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); }

/**
* @brief Gets the concrete nesting depth of output cudf columns
*
Expand Down
33 changes: 33 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport int64_t
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector

cimport cudf._lib.cpp.io.types as cudf_io_types
from cudf._lib.cpp.types cimport size_type


cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil:
cdef cppclass parquet_column_schema:
parquet_column_schema() except+
string name() except+
size_type num_children() except+
parquet_column_schema child(int idx) except+
vector[parquet_column_schema] children() except+

cdef cppclass parquet_schema:
parquet_schema() except+
parquet_column_schema root() except+

cdef cppclass parquet_metadata:
parquet_metadata() except+
parquet_schema schema() except+
int64_t num_rows() except+
size_type num_rowgroups() except+
size_type num_columns() except+
unordered_map[string, string] metadata() except+
vector[unordered_map[string, int64_t]] rowgroup_metadata() except+

cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+
45 changes: 45 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ from cudf._lib.cpp.io.parquet cimport (
read_parquet as parquet_reader,
write_parquet as parquet_writer,
)
from cudf._lib.cpp.io.parquet_metadata cimport (
parquet_metadata,
read_parquet_metadata as parquet_metadata_reader,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
)
from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type
Expand Down Expand Up @@ -316,6 +320,47 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
df._data.label_dtype = cudf.dtype(column_index_type)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
"""
Cython function to call into libcudf API, see `read_parquet_metadata`.

filters, if not None, should be an Expression that evaluates to a
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
boolean predicate as a function of columns being read.

See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""
# Convert NativeFile buffers to NativeFileDatasource,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers)

args = move(source)

cdef parquet_metadata c_result

# Read Parquet metadata
with nogil:
c_result = move(parquet_metadata_reader(args))
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

# access and return results
num_columns = c_result.num_columns()
num_rows = c_result.num_rows()
num_rowgroups = c_result.num_rowgroups()
names = [info.name().decode() for info in c_result.schema().root().children()]

# extract row group metadata and sanitize keys
row_group_metadata=[]
row_group_metadata_unsanitized = c_result.rowgroup_metadata()
for meta in row_group_metadata_unsanitized:
row_group_metadata.append({k.decode(): v for k, v in meta})

return num_rows, num_rowgroups, names, num_columns, row_group_metadata


@acquire_spill_lock()
def write_parquet(
Expand Down
61 changes: 54 additions & 7 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,64 @@ def write_to_dataset(

@ioutils.doc_read_parquet_metadata()
@_cudf_nvtx_annotate
def read_parquet_metadata(path):
def read_parquet_metadata(filepath_or_buffer):
"""{docstring}"""
import pyarrow.parquet as pq
# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
filepath_or_buffer = [filepath_or_buffer]

pq_file = pq.ParquetFile(path)
# Start by trying construct a filesystem object, so we
# can apply filters on remote file-systems
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=None
)

num_rows = pq_file.metadata.num_rows
num_row_groups = pq_file.num_row_groups
col_names = pq_file.schema.names
# Use pyarrow dataset to detect/process directory-partitioned
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
# data and apply filters. Note that we can only support partitioned
# data and filtering if the input is a single directory or list of
# paths.
partition_keys = []
partition_categories = {}
if fs and paths:
(
paths,
row_groups,
partition_keys,
partition_categories,
) = _process_dataset(
paths=paths,
fs=fs,
filters=None,
row_groups=None,
categorical_partitions=True,
dataset_kwargs=None,
)
filepath_or_buffer = paths if paths else filepath_or_buffer

filepaths_or_buffers = []

for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)

return num_rows, num_row_groups, col_names
return libparquet.read_parquet_metadata(filepaths_or_buffers)


@_cudf_nvtx_annotate
Expand Down
Loading
Loading