Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand statistics support in ORC writer #13848

Merged
merged 48 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b614fe8
sum w/o minmax; double sum, string sum
vuule Aug 10, 2023
9a7988a
write hasNull!
vuule Aug 10, 2023
70c3b28
tests
vuule Aug 10, 2023
b392376
style
vuule Aug 10, 2023
eb6cdae
clean up
vuule Aug 10, 2023
4776140
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 10, 2023
e277645
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 11, 2023
9fbe6c5
fix python tests
vuule Aug 14, 2023
d833ff0
remove incorrect bucket stats
vuule Aug 14, 2023
1ca376a
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 14, 2023
1db56c5
remove bool column from C++ stats tests
vuule Aug 14, 2023
950cec8
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 14, 2023
50b67d8
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 17, 2023
cb61069
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 17, 2023
96b3112
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 21, 2023
01af60b
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Aug 21, 2023
2f35d5a
test clean up
vuule Aug 21, 2023
cc54019
Merge branch 'fea-expand-stats' of https://github.com/vuule/cudf into…
vuule Aug 21, 2023
864bdd1
Merge branch 'branch-23.10' into fea-expand-stats
vuule Aug 24, 2023
fcfa662
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Aug 24, 2023
ee1347f
restore bool stats
vuule Aug 24, 2023
d7facda
add bool test; fix docs
vuule Aug 24, 2023
bfa0d8b
add timestamp min/max
vuule Aug 24, 2023
cb71541
fix init buffersize
vuule Aug 24, 2023
227a9c1
use int128 for all decimal columns
vuule Aug 25, 2023
23a5e14
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Aug 29, 2023
f2f6090
actual dec stats size
vuule Aug 29, 2023
64636f9
add decimal stats
vuule Aug 30, 2023
75fe574
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Aug 30, 2023
c3b7410
Merge branch 'fea-expand-stats' of https://github.com/vuule/cudf into…
vuule Aug 30, 2023
e79496c
add timestamp nanoseconds
vuule Aug 31, 2023
7d0e3c7
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Aug 31, 2023
b5dbea1
de-duplicate decimal to string code
vuule Aug 31, 2023
7b27f56
don't assume dec len
vuule Aug 31, 2023
e65310d
expand tests
vuule Aug 31, 2023
8c58c6f
mostly docs
vuule Sep 1, 2023
eacb578
style
vuule Sep 1, 2023
3afcd64
Merge branch 'branch-23.10' into fea-expand-stats
vuule Sep 1, 2023
3746cb4
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Sep 5, 2023
7808cb3
test fix
vuule Sep 5, 2023
f181df2
Merge branch 'fea-expand-stats' of https://github.com/vuule/cudf into…
vuule Sep 5, 2023
3c0da37
Merge branch 'branch-23.10' into fea-expand-stats
vuule Sep 8, 2023
3a61eee
Merge branch 'branch-23.10' into fea-expand-stats
karthikeyann Sep 13, 2023
8742633
Merge branch 'branch-23.10' into fea-expand-stats
vuule Sep 16, 2023
8920b71
Merge branch 'branch-23.10' of https://github.com/rapidsai/cudf into …
vuule Sep 18, 2023
d027019
simplify lambda
vuule Sep 18, 2023
2136109
const
vuule Sep 18, 2023
62862b4
Merge branch 'fea-expand-stats' of https://github.com/vuule/cudf into…
vuule Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ struct stats_state_s {
statistics_chunk chunk;
statistics_merge_group group;
statistics_dtype stats_dtype; //!< Statistics data type for this column
// ORC stats
uint64_t numberOfValues;
uint8_t hasNull;
Comment on lines -129 to -131
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was unused

};

/*
Expand Down Expand Up @@ -228,12 +225,14 @@ __global__ void __launch_bounds__(encode_threads_per_block)

// Encode and update actual bfr size
if (idx < statistics_count && t == 0) {
s->chunk = chunks[idx];
s->group = groups[idx];
s->stats_dtype = s->group.stats_dtype;
s->base = blob_bfr + s->group.start_chunk;
s->end = blob_bfr + s->group.start_chunk + s->group.num_chunks;
uint8_t* cur = pb_put_uint(s->base, 1, s->chunk.non_nulls);
s->chunk = chunks[idx];
s->group = groups[idx];
s->stats_dtype = s->group.stats_dtype;
s->base = blob_bfr + s->group.start_chunk;
s->end = blob_bfr + s->group.start_chunk + s->group.num_chunks;
uint8_t* cur = pb_put_uint(s->base, 1, s->chunk.non_nulls);
cur = pb_put_uint(cur, 10, s->chunk.null_count != 0); // hasNull (bool)

uint8_t* fld_start = cur;
switch (s->stats_dtype) {
case dtype_int8:
Expand Down Expand Up @@ -265,11 +264,14 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional double maximum = 2;
// optional double sum = 3;
// }
if (s->chunk.has_minmax) {
if (s->chunk.has_minmax || s->chunk.has_sum) {
*cur = 3 * 8 + ProtofType::FIXEDLEN;
cur += 2;
cur = pb_put_fixed64(cur, 1, &s->chunk.min_value.fp_val);
cur = pb_put_fixed64(cur, 2, &s->chunk.max_value.fp_val);
if (s->chunk.has_minmax) {
cur = pb_put_fixed64(cur, 1, &s->chunk.min_value.fp_val);
cur = pb_put_fixed64(cur, 2, &s->chunk.max_value.fp_val);
}
if (s->chunk.has_sum) { cur = pb_put_fixed64(cur, 3, &s->chunk.sum.fp_val); }
fld_start[1] = cur - (fld_start + 2);
}
break;
Expand All @@ -280,30 +282,33 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional string maximum = 2;
// optional sint64 sum = 3; // sum will store the total length of all strings
// }
if (s->chunk.has_minmax && s->chunk.has_sum) {
uint32_t sz = (pb_put_int(cur, 3, s->chunk.sum.i_val) - cur) +
(pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) +
(pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) +
s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length;
if (s->chunk.has_minmax || s->chunk.has_sum) {
uint32_t sz = 0;
if (s->chunk.has_minmax) {
sz += (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) +
(pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) +
s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length;
}
if (s->chunk.has_sum) { sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur; }

cur[0] = 4 * 8 + ProtofType::FIXEDLEN;
cur = pb_encode_uint(cur + 1, sz);
cur = pb_put_binary(
cur, 1, s->chunk.min_value.str_val.ptr, s->chunk.min_value.str_val.length);
cur = pb_put_binary(
cur, 2, s->chunk.max_value.str_val.ptr, s->chunk.max_value.str_val.length);
cur = pb_put_int(cur, 3, s->chunk.sum.i_val);

if (s->chunk.has_minmax) {
cur = pb_put_binary(
cur, 1, s->chunk.min_value.str_val.ptr, s->chunk.min_value.str_val.length);
cur = pb_put_binary(
cur, 2, s->chunk.max_value.str_val.ptr, s->chunk.max_value.str_val.length);
}
if (s->chunk.has_sum) { cur = pb_put_int(cur, 3, s->chunk.sum.i_val); }
}
break;
case dtype_bool:
// bucketStatistics = 5
// message BucketStatistics {
// repeated uint64 count = 1 [packed=true];
// }
if (s->chunk.has_sum) { // Sum is equal to the number of 'true' values
cur[0] = 5 * 8 + ProtofType::FIXEDLEN;
cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val);
fld_start[1] = cur - (fld_start + 2);
}
// Not implemented, see https://github.com/rapidsai/cudf/issues/7087
break;
case dtype_decimal64:
case dtype_decimal128:
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/statistics/typed_statistics_chunk.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ get_untyped_chunk(typed_statistics_chunk<T, include_aggregate> const& chunk)
stat.null_count = chunk.null_count;
stat.has_minmax = chunk.has_minmax;
stat.has_sum = [&]() {
if (!chunk.has_minmax) return false;
// invalidate the sum if overflow or underflow is possible
if constexpr (std::is_floating_point_v<E> or std::is_integral_v<E>) {
if (!chunk.has_minmax) { return true; }
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
return std::numeric_limits<E>::max() / chunk.non_nulls >=
static_cast<E>(chunk.maximum_value) and
std::numeric_limits<E>::lowest() / chunk.non_nulls <=
Expand Down
76 changes: 57 additions & 19 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,10 +987,9 @@ TEST_F(OrcStatisticsTest, Basic)
column_wrapper<float, typename decltype(sequence)::value_type> col2(
sequence, sequence + num_rows, validity);
column_wrapper<cudf::string_view> col3{strings.begin(), strings.end()};
column_wrapper<bool, typename decltype(sequence)::value_type> col4(sequence, sequence + num_rows);
column_wrapper<cudf::timestamp_s, typename decltype(sequence)::value_type> col5(
column_wrapper<cudf::timestamp_s, typename decltype(sequence)::value_type> col4(
sequence, sequence + num_rows, validity);
table_view expected({col1, col2, col3, col4, col5});
table_view expected({col1, col2, col3, col4});

auto filepath = temp_env->get_temp_filepath("OrcStatsMerge.orc");

Expand All @@ -1001,7 +1000,7 @@ TEST_F(OrcStatisticsTest, Basic)
auto const stats = cudf::io::read_parsed_orc_statistics(cudf::io::source_info{filepath});

auto const expected_column_names =
std::vector<std::string>{"", "_col0", "_col1", "_col2", "_col3", "_col4"};
std::vector<std::string>{"", "_col0", "_col1", "_col2", "_col3"};
EXPECT_EQ(stats.column_names, expected_column_names);

auto validate_statistics = [&](std::vector<cudf::io::column_statistics> const& stats) {
Expand All @@ -1010,37 +1009,36 @@ TEST_F(OrcStatisticsTest, Basic)

auto& s1 = stats[1];
EXPECT_EQ(*s1.number_of_values, 4ul);
EXPECT_TRUE(*s1.has_null);
auto& ts1 = std::get<cudf::io::integer_statistics>(s1.type_specific_stats);
EXPECT_EQ(*ts1.minimum, 1);
EXPECT_EQ(*ts1.maximum, 7);
EXPECT_EQ(*ts1.sum, 16);

auto& s2 = stats[2];
EXPECT_EQ(*s2.number_of_values, 4ul);
EXPECT_TRUE(*s2.has_null);
auto& ts2 = std::get<cudf::io::double_statistics>(s2.type_specific_stats);
EXPECT_EQ(*ts2.minimum, 1.);
EXPECT_EQ(*ts2.maximum, 7.);
// No sum ATM, filed #7087
ASSERT_FALSE(ts2.sum);
EXPECT_EQ(*ts2.sum, 16.);

auto& s3 = stats[3];
EXPECT_EQ(*s3.number_of_values, 9ul);
EXPECT_FALSE(*s3.has_null);
auto& ts3 = std::get<cudf::io::string_statistics>(s3.type_specific_stats);
EXPECT_EQ(*ts3.minimum, "Friday");
EXPECT_EQ(*ts3.maximum, "Wednesday");
EXPECT_EQ(*ts3.sum, 58ul);

auto& s4 = stats[4];
EXPECT_EQ(*s4.number_of_values, 9ul);
EXPECT_EQ(std::get<cudf::io::bucket_statistics>(s4.type_specific_stats).count[0], 8ul);

auto& s5 = stats[5];
EXPECT_EQ(*s5.number_of_values, 4ul);
auto& ts5 = std::get<cudf::io::timestamp_statistics>(s5.type_specific_stats);
EXPECT_EQ(*ts5.minimum_utc, 1000);
EXPECT_EQ(*ts5.maximum_utc, 7000);
ASSERT_FALSE(ts5.minimum);
ASSERT_FALSE(ts5.maximum);
EXPECT_EQ(*s4.number_of_values, 4ul);
EXPECT_TRUE(*s4.has_null);
auto& ts4 = std::get<cudf::io::timestamp_statistics>(s4.type_specific_stats);
EXPECT_EQ(*ts4.minimum_utc, 1000);
EXPECT_EQ(*ts4.maximum_utc, 7000);
ASSERT_FALSE(ts4.minimum);
ASSERT_FALSE(ts4.maximum);
};

validate_statistics(stats.file_stats);
Expand Down Expand Up @@ -1259,9 +1257,8 @@ TEST_F(OrcStatisticsTest, Overflow)

TEST_F(OrcStatisticsTest, HasNull)
{
// cudf's ORC writer doesn't yet support the ability to encode the hasNull value in statistics so
// we're embedding a file created using pyorc
//
// This test can now be implemented with libcudf; keeping the pyorc version to keep the test
// inputs diversified
// Method to create file:
// >>> import pyorc
// >>> output = open("./temp.orc", "wb")
Expand Down Expand Up @@ -1861,4 +1858,45 @@ TEST_F(OrcWriterTest, EmptyChildStringColumn)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

template <typename T>
void check_all_null_stats(cudf::io::column_statistics const& stats)
{
EXPECT_EQ(stats.number_of_values, 0);
EXPECT_TRUE(stats.has_null);

auto const ts = std::get<T>(stats.type_specific_stats);
EXPECT_FALSE(ts.minimum.has_value());
EXPECT_FALSE(ts.maximum.has_value());
EXPECT_TRUE(ts.sum.has_value());
EXPECT_EQ(*ts.sum, 0);
}

TEST_F(OrcStatisticsTest, AllNulls)
{
bool all_null[] = {false, false, false}; // all null
vuule marked this conversation as resolved.
Show resolved Hide resolved

std::vector doubles{1.1, 2.2, 3.3};
float64_col double_col(doubles.begin(), doubles.end(), all_null);

std::vector ints{1, 2, 3};
int32_col int_col(ints.begin(), ints.end(), all_null);

std::vector strings{"1", "2", "3"};
str_col string_col(strings.begin(), strings.end(), all_null);

cudf::table_view expected({int_col, double_col, string_col});

std::vector<char> out_buffer;
cudf::io::orc_writer_options out_opts =
cudf::io::orc_writer_options::builder(cudf::io::sink_info{&out_buffer}, expected);
cudf::io::write_orc(out_opts);

auto const stats = cudf::io::read_parsed_orc_statistics(
cudf::io::source_info{out_buffer.data(), out_buffer.size()});

check_all_null_stats<cudf::io::integer_statistics>(stats.file_stats[1]);
check_all_null_stats<cudf::io::double_statistics>(stats.file_stats[2]);
check_all_null_stats<cudf::io::string_statistics>(stats.file_stats[3]);
}

CUDF_TEST_PROGRAM_MAIN()
60 changes: 36 additions & 24 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,16 +633,19 @@ def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq):
for col in gdf:
if "minimum" in file_stats[0][col]:
stats_min = file_stats[0][col]["minimum"]
actual_min = gdf[col].min()
assert normalized_equals(actual_min, stats_min)
if stats_min is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances does read_orc_statistics now return None in these slots when it didn't before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question!
The change in behavior is when a column only contains nulls. Previously we did not return any statistics for such column so we would not perform this comparison in the test. This PR changes the behavior so that statistics containing only the sum are included if there are not valid elements. So now the test need to correctly check "partial" statistics, i.e. min and max are not present, but sum is.

actual_min = gdf[col].min()
assert normalized_equals(actual_min, stats_min)
if "maximum" in file_stats[0][col]:
stats_max = file_stats[0][col]["maximum"]
actual_max = gdf[col].max()
assert normalized_equals(actual_max, stats_max)
if stats_max is not None:
actual_max = gdf[col].max()
assert normalized_equals(actual_max, stats_max)
if "number_of_values" in file_stats[0][col]:
stats_num_vals = file_stats[0][col]["number_of_values"]
actual_num_vals = gdf[col].count()
assert stats_num_vals == actual_num_vals
if stats_num_vals is not None:
actual_num_vals = gdf[col].count()
assert stats_num_vals == actual_num_vals

# compare stripe statistics with actual min/max
for stripe_idx in range(0, orc_file.nstripes):
Expand All @@ -651,21 +654,24 @@ def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq):
stripe_df = cudf.DataFrame(stripe.to_pandas())
for col in stripe_df:
if "minimum" in stripes_stats[stripe_idx][col]:
actual_min = stripe_df[col].min()
stats_min = stripes_stats[stripe_idx][col]["minimum"]
assert normalized_equals(actual_min, stats_min)
if stats_min is not None:
actual_min = stripe_df[col].min()
assert normalized_equals(actual_min, stats_min)

if "maximum" in stripes_stats[stripe_idx][col]:
actual_max = stripe_df[col].max()
stats_max = stripes_stats[stripe_idx][col]["maximum"]
assert normalized_equals(actual_max, stats_max)
if stats_max is not None:
actual_max = stripe_df[col].max()
assert normalized_equals(actual_max, stats_max)

if "number_of_values" in stripes_stats[stripe_idx][col]:
stats_num_vals = stripes_stats[stripe_idx][col][
"number_of_values"
]
actual_num_vals = stripe_df[col].count()
assert stats_num_vals == actual_num_vals
if stats_num_vals is not None:
actual_num_vals = stripe_df[col].count()
assert stats_num_vals == actual_num_vals


@pytest.mark.parametrize("stats_freq", ["STRIPE", "ROWGROUP"])
Expand Down Expand Up @@ -733,16 +739,19 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq):
for col in expect:
if "minimum" in file_stats[0][col]:
stats_min = file_stats[0][col]["minimum"]
actual_min = expect[col].min()
assert normalized_equals(actual_min, stats_min)
if stats_min is not None:
actual_min = expect[col].min()
assert normalized_equals(actual_min, stats_min)
if "maximum" in file_stats[0][col]:
stats_max = file_stats[0][col]["maximum"]
actual_max = expect[col].max()
assert normalized_equals(actual_max, stats_max)
if stats_max is not None:
actual_max = expect[col].max()
assert normalized_equals(actual_max, stats_max)
if "number_of_values" in file_stats[0][col]:
stats_num_vals = file_stats[0][col]["number_of_values"]
actual_num_vals = expect[col].count()
assert stats_num_vals == actual_num_vals
if stats_num_vals is not None:
actual_num_vals = expect[col].count()
assert stats_num_vals == actual_num_vals

# compare stripe statistics with actual min/max
for stripe_idx in range(0, orc_file.nstripes):
Expand All @@ -751,21 +760,24 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq):
stripe_df = cudf.DataFrame(stripe.to_pandas())
for col in stripe_df:
if "minimum" in stripes_stats[stripe_idx][col]:
actual_min = stripe_df[col].min()
stats_min = stripes_stats[stripe_idx][col]["minimum"]
assert normalized_equals(actual_min, stats_min)
if stats_min is not None:
actual_min = stripe_df[col].min()
assert normalized_equals(actual_min, stats_min)

if "maximum" in stripes_stats[stripe_idx][col]:
actual_max = stripe_df[col].max()
stats_max = stripes_stats[stripe_idx][col]["maximum"]
assert normalized_equals(actual_max, stats_max)
if stats_max is not None:
actual_max = stripe_df[col].max()
assert normalized_equals(actual_max, stats_max)

if "number_of_values" in stripes_stats[stripe_idx][col]:
stats_num_vals = stripes_stats[stripe_idx][col][
"number_of_values"
]
actual_num_vals = stripe_df[col].count()
assert stats_num_vals == actual_num_vals
if stats_num_vals is not None:
actual_num_vals = stripe_df[col].count()
assert stats_num_vals == actual_num_vals


@pytest.mark.parametrize("nrows", [1, 100, 6000000])
Expand Down