Skip to content

Commit

Permalink
Return empty dataframe when reading a Parquet file using empty `colum…
Browse files Browse the repository at this point in the history
…ns` option (#11018)

Fixes #8668
Store the columns option as `optional`:

- `nullopt` when columns are not passed by caller - read all columns.
- Empty vector when caller explicitly passes an empty list/vector - return empty dataframe.
- Vector of column names - read columns with given names.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Andy Grove (https://github.com/andygrove)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Yunsong Wang (https://github.com/PointKernel)
  - Devavret Makkar (https://github.com/devavret)
  - Jason Lowe (https://github.com/jlowe)

URL: #11018
  • Loading branch information
vuule authored Jun 17, 2022
1 parent ef6a390 commit 379faf9
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 20 deletions.
11 changes: 7 additions & 4 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class parquet_reader_options {
source_info _source;

// Path in schema of column to read; empty is all
std::vector<std::string> _columns;
std::optional<std::vector<std::string>> _columns;

// List of individual row groups to read (ignored if empty)
std::vector<std::vector<size_type>> _row_groups;
Expand Down Expand Up @@ -132,11 +132,14 @@ class parquet_reader_options {
[[nodiscard]] size_type get_num_rows() const { return _num_rows; }

/**
* @brief Returns names of column to be read.
* @brief Returns names of column to be read, if set.
*
* @return Names of column to be read
* @return Names of column to be read; `nullopt` if the option is not set
*/
[[nodiscard]] std::vector<std::string> const& get_columns() const { return _columns; }
[[nodiscard]] std::optional<std::vector<std::string>> const& get_columns() const
{
return _columns;
}

/**
* @brief Returns list of individual row groups to be read.
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,16 @@ class aggregate_reader_metadata {
/**
* @brief Filters and reduces down to a selection of columns
*
* @param use_names List of paths of column names to select
* @param use_names List of paths of column names to select; `nullopt` if user did not select
* columns to read
* @param include_index Whether to always include the PANDAS index column(s)
* @param strings_to_categorical Type conversion parameter
* @param timestamp_type_id Type conversion parameter
*
* @return input column information, output column information, list of output column schema
* indices
*/
[[nodiscard]] auto select_columns(std::vector<std::string> const& use_names,
[[nodiscard]] auto select_columns(std::optional<std::vector<std::string>> const& use_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
Expand Down Expand Up @@ -724,7 +725,7 @@ class aggregate_reader_metadata {
// ["name", "firstname"]
//
auto const& root = get_schema(0);
if (use_names.empty()) {
if (not use_names.has_value()) {
for (auto const& schema_idx : root.children_idx) {
build_column(nullptr, schema_idx, output_columns);
output_column_schemas.push_back(schema_idx);
Expand Down Expand Up @@ -752,7 +753,7 @@ class aggregate_reader_metadata {

// Find which of the selected paths are valid and get their schema index
std::vector<path_info> valid_selected_paths;
for (auto const& selected_path : use_names) {
for (auto const& selected_path : *use_names) {
auto found_path =
std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) {
return valid_path.full_path == selected_path;
Expand Down
20 changes: 20 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3398,4 +3398,24 @@ TEST_F(ParquetWriterTest, CheckPageRows)
EXPECT_EQ(nvals, page_rows);
}

TEST_F(ParquetReaderTest, EmptyColumnsParam)
{
srand(31337);
auto const expected = create_random_fixed_table<int>(2, 4, false);

std::vector<char> out_buffer;
cudf_io::parquet_writer_options args =
cudf_io::parquet_writer_options::builder(cudf_io::sink_info{&out_buffer}, *expected);
cudf_io::write_parquet(args);

cudf_io::parquet_reader_options read_opts =
cudf_io::parquet_reader_options::builder(
cudf_io::source_info{out_buffer.data(), out_buffer.size()})
.columns({});
auto const result = cudf_io::read_parquet(read_opts);

EXPECT_EQ(result.tbl->num_columns(), 0);
EXPECT_EQ(result.tbl->num_rows(), 0);
}

CUDF_TEST_PROGRAM_MAIN()
3 changes: 1 addition & 2 deletions java/src/main/java/ai/rapids/cudf/ColumnFilterOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

/**
* Base options class for input formats that can filter columns.
Expand Down
9 changes: 6 additions & 3 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1497,10 +1497,13 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv *env,
static_cast<std::size_t>(buffer_length)) :
cudf::io::source_info(filename.get());

auto builder = cudf::io::parquet_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
builder = builder.columns(n_filter_col_names.as_cpp_vector());
}

cudf::io::parquet_reader_options opts =
cudf::io::parquet_reader_options::builder(source)
.columns(n_filter_col_names.as_cpp_vector())
.convert_strings_to_categories(false)
builder.convert_strings_to_categories(false)
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();
return convert_table_for_return(env, cudf::io::read_parquet(opts).tbl);
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
cdef cppclass parquet_reader_options:
parquet_reader_options() except +
cudf_io_types.source_info get_source_info() except +
vector[string] get_columns() except +
vector[vector[size_type]] get_row_groups() except +
data_type get_timestamp_type() except +
bool is_enabled_convert_strings_to_categories() except +
Expand Down
17 changes: 11 additions & 6 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

cdef vector[string] cpp_columns
cdef bool cpp_strings_to_categorical = strings_to_categorical
cdef bool cpp_use_pandas_metadata = use_pandas_metadata
cdef size_type cpp_skiprows = skiprows if skiprows is not None else 0
Expand All @@ -145,18 +144,13 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
cudf_types.type_id.EMPTY
)

if columns is not None:
cpp_columns.reserve(len(columns))
for col in columns or []:
cpp_columns.push_back(str(col).encode())
if row_groups is not None:
cpp_row_groups = row_groups

cdef parquet_reader_options args
# Setup parquet reader arguments
args = move(
parquet_reader_options.builder(source)
.columns(cpp_columns)
.row_groups(cpp_row_groups)
.convert_strings_to_categories(cpp_strings_to_categorical)
.use_pandas_metadata(cpp_use_pandas_metadata)
Expand All @@ -165,6 +159,15 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
.timestamp_type(cpp_timestamp_type)
.build()
)
cdef vector[string] cpp_columns
allow_range_index = True
if columns is not None:
cpp_columns.reserve(len(columns))
if len(cpp_columns) == 0:
allow_range_index = False
for col in columns or []:
cpp_columns.push_back(str(col).encode())
args.set_columns(cpp_columns)

# Read Parquet
cdef cudf_io_types.table_with_metadata c_out_table
Expand Down Expand Up @@ -218,6 +221,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
# Set the index column
if index_col is not None and len(index_col) > 0:
if is_range_index:
if not allow_range_index:
return df
range_index_meta = index_col[0]
if row_groups is not None:
per_file_metadata = [
Expand Down
13 changes: 13 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,3 +2550,16 @@ def test_parquet_reader_zstd_compression(datadir):
assert_eq(df, pdf)
except RuntimeError:
pytest.mark.xfail(reason="zstd support is not enabled")


@pytest.mark.parametrize("index", [True, False, None])
@pytest.mark.parametrize("columns", [None, [], ["b", "a"]])
def test_parquet_columns_and_index_param(index, columns):
buffer = BytesIO()
df = cudf.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df.to_parquet(buffer, index=index)

expected = pd.read_parquet(buffer, columns=columns)
got = cudf.read_parquet(buffer, columns=columns)

assert_eq(expected, got, check_index_type=True)

0 comments on commit 379faf9

Please sign in to comment.