Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist string statistics data across multiple calls to orc chunked write #10694

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 136 additions & 54 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
#include <numeric>
#include <utility>

#include <cooperative_groups.h>
#include <cooperative_groups/memcpy_async.h>

#include <cuda/std/limits>

namespace cudf {
Expand Down Expand Up @@ -1233,55 +1236,61 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs(
auto const num_stripe_blobs =
thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size());
auto const num_file_blobs = num_columns;
auto const num_blobs = single_write_mode ? static_cast<int>(num_stripe_blobs + num_file_blobs)
: static_cast<int>(num_stripe_blobs);
auto const num_blobs = static_cast<int>(num_stripe_blobs + num_file_blobs);

if (num_stripe_blobs == 0) { return {}; }

// merge the stripe persisted data and add file data
rmm::device_uvector<statistics_chunk> stat_chunks(num_blobs, stream);
hostdevice_vector<statistics_merge_group> stats_merge(num_blobs, stream);

size_t chunk_offset = 0;
size_t merge_offset = 0;
// we need to merge the stat arrays from the persisted data.
// this needs to be done carefully because each array can contain
// a different number of stripes and stripes from each column must be
// located next to each other. We know the total number of stripes and
// we know the size of each array. The number of stripes per column in a chunk array can
// be calculated by dividing the number of chunks by the number of columns.
// That many chunks need to be copied at a time to the proper destination.
size_t num_entries_seen = 0;
for (size_t i = 0; i < per_chunk_stats.stripe_stat_chunks.size(); ++i) {
auto chunk_bytes = per_chunk_stats.stripe_stat_chunks[i].size() * sizeof(statistics_chunk);
auto merge_bytes = per_chunk_stats.stripe_stat_merge[i].size() * sizeof(statistics_merge_group);
cudaMemcpyAsync(stat_chunks.data() + chunk_offset,
per_chunk_stats.stripe_stat_chunks[i].data(),
chunk_bytes,
cudaMemcpyDeviceToDevice,
stream);
cudaMemcpyAsync(stats_merge.device_ptr() + merge_offset,
per_chunk_stats.stripe_stat_merge[i].device_ptr(),
merge_bytes,
cudaMemcpyDeviceToDevice,
stream);
chunk_offset += per_chunk_stats.stripe_stat_chunks[i].size();
merge_offset += per_chunk_stats.stripe_stat_merge[i].size();
auto const stripes_per_col = per_chunk_stats.stripe_stat_chunks[i].size() / num_columns;

auto const chunk_bytes = stripes_per_col * sizeof(statistics_chunk);
auto const merge_bytes = stripes_per_col * sizeof(statistics_merge_group);
for (size_t col = 0; col < num_columns; ++col) {
cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + num_entries_seen,
vyasr marked this conversation as resolved.
Show resolved Hide resolved
per_chunk_stats.stripe_stat_chunks[i].data() + col * stripes_per_col,
chunk_bytes,
cudaMemcpyDeviceToDevice,
stream);
cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + num_entries_seen,
per_chunk_stats.stripe_stat_merge[i].device_ptr() + col * stripes_per_col,
merge_bytes,
cudaMemcpyDeviceToDevice,
stream);
}
num_entries_seen += stripes_per_col;
}

if (single_write_mode) {
vyasr marked this conversation as resolved.
Show resolved Hide resolved
std::vector<statistics_merge_group> file_stats_merge(num_file_blobs);
for (auto i = 0u; i < num_file_blobs; ++i) {
auto col_stats = &file_stats_merge[i];
col_stats->col_dtype = per_chunk_stats.col_types[i];
col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i];
col_stats->start_chunk = static_cast<uint32_t>(i * num_stripes);
col_stats->num_chunks = static_cast<uint32_t>(num_stripes);
}
std::vector<statistics_merge_group> file_stats_merge(num_file_blobs);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
for (auto i = 0u; i < num_file_blobs; ++i) {
auto col_stats = &file_stats_merge[i];
col_stats->col_dtype = per_chunk_stats.col_types[i];
col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i];
col_stats->start_chunk = static_cast<uint32_t>(i * num_stripes);
col_stats->num_chunks = static_cast<uint32_t>(num_stripes);
}

auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs);
cudaMemcpyAsync(d_file_stats_merge,
file_stats_merge.data(),
num_file_blobs * sizeof(statistics_merge_group),
cudaMemcpyHostToDevice,
stream);
auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs);
cudaMemcpyAsync(d_file_stats_merge,
file_stats_merge.data(),
num_file_blobs * sizeof(statistics_merge_group),
cudaMemcpyHostToDevice,
stream);

auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs;
detail::merge_group_statistics<detail::io_file_format::ORC>(
file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream);
}
auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs;
detail::merge_group_statistics<detail::io_file_format::ORC>(
file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream);

hostdevice_vector<uint8_t> blobs =
allocate_and_encode_blobs(stats_merge, stat_chunks, num_blobs, stream);
Expand All @@ -1295,14 +1304,12 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs(
stripe_blobs[i].assign(stat_begin, stat_end);
}

std::vector<ColStatsBlob> file_blobs(single_write_mode ? num_file_blobs : 0);
if (single_write_mode) {
auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs);
for (auto i = 0u; i < num_file_blobs; i++) {
auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + file_stat_merge[i].num_chunks;
file_blobs[i].assign(stat_begin, stat_end);
}
std::vector<ColStatsBlob> file_blobs(num_file_blobs);
auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs);
for (auto i = 0u; i < num_file_blobs; i++) {
auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk);
auto const stat_end = stat_begin + file_stat_merge[i].num_chunks;
file_blobs[i].assign(stat_begin, stat_end);
}

return {std::move(stripe_blobs), std::move(file_blobs)};
Expand Down Expand Up @@ -1937,6 +1944,87 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table,
std::move(is_dict_enabled)};
}

struct string_length_functor {
__device__ inline size_type operator()(int const i) const
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
{
if (i >= num_chunks * 2) return 0;

auto const min = i % 2 == 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto const idx = i / 2;
auto& str_val =
min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val;
auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string;
return str ? str_val.length : 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
}

int num_chunks;
statistics_chunk* stripe_stat_chunks;
statistics_merge_group* stripe_stat_merge;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
};

__global__ void copy_string_data(char* string_pool,
size_type* offsets,
statistics_chunk* chunks,
statistics_merge_group const* groups)
{
auto const idx = blockIdx.x / 2;
if (groups[idx].stats_dtype == dtype_string) {
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto const min = blockIdx.x % 2 == 0;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
auto& str_val = min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val;
auto dst = &string_pool[offsets[blockIdx.x]];
auto src = str_val.ptr;

for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) {
dst[i] = src[i];
}
if (threadIdx.x == 0) { str_val.ptr = dst; }
}
}

void writer::impl::persisted_statistics::persist(int num_table_rows,
bool single_write_mode,
intermediate_statistics& intermediate_stats,
rmm::cuda_stream_view stream)
{
if (not single_write_mode) {
// persist the strings in the chunks into a string pool and update pointers
auto const num_chunks = static_cast<int>(intermediate_stats.stripe_stat_chunks.size());
// min offset and max offset + 1 for total size
rmm::device_uvector<size_type> offsets((num_chunks * 2) + 1, stream);

auto iter = cudf::detail::make_counting_transform_iterator(
0,
string_length_functor{num_chunks,
intermediate_stats.stripe_stat_chunks.data(),
intermediate_stats.stripe_stat_merge.device_ptr()});
thrust::exclusive_scan(
rmm::exec_policy(stream), iter, iter + (num_chunks * 2) + 1, offsets.begin());
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

// pull size back to host
auto const total_string_pool_size = offsets.element(num_chunks * 2, stream);
if (total_string_pool_size > 0) {
rmm::device_uvector<char> string_pool(total_string_pool_size, stream);

// offsets describes where in the string pool each string goes. Going with the simple
// approach for now, but it is possible something fancier with breaking up each thread into
// copying x bytes instead of a single string is the better method since we are dealing in
// min/max strings they almost certainly will not be uniform length.
copy_string_data<<<num_chunks * 2, 256, 0, stream.value()>>>(
string_pool.data(),
offsets.data(),
intermediate_stats.stripe_stat_chunks.data(),
intermediate_stats.stripe_stat_merge.device_ptr());
string_pools.emplace_back(std::move(string_pool));
}
}

stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks));
stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge));
stats_dtypes = std::move(intermediate_stats.stats_dtypes);
col_types = std::move(intermediate_stats.col_types);
num_rows = num_table_rows;
}

void writer::impl::write(table_view const& table)
{
CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed");
Expand Down Expand Up @@ -2073,13 +2161,8 @@ void writer::impl::write(table_view const& table)
auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation);

if (intermediate_stats.stripe_stat_chunks.size() > 0) {
persisted_stripe_statistics.stripe_stat_chunks.emplace_back(
std::move(intermediate_stats.stripe_stat_chunks));
persisted_stripe_statistics.stripe_stat_merge.emplace_back(
std::move(intermediate_stats.stripe_stat_merge));
persisted_stripe_statistics.stats_dtypes = std::move(intermediate_stats.stats_dtypes);
persisted_stripe_statistics.col_types = std::move(intermediate_stats.col_types);
persisted_stripe_statistics.num_rows = orc_table.num_rows();
persisted_stripe_statistics.persist(
orc_table.num_rows(), single_write_mode, intermediate_stats, stream);
}

// Write stripes
Expand Down Expand Up @@ -2139,7 +2222,6 @@ void writer::impl::write(table_view const& table)
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}

for (auto const& task : write_tasks) {
task.wait();
}
Expand Down Expand Up @@ -2202,7 +2284,7 @@ void writer::impl::close()
auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics);

// File-level statistics
if (single_write_mode and not statistics.file_level.empty()) {
if (not statistics.file_level.empty()) {
buffer_.resize(0);
pbw_.put_uint(encode_field_number<size_type>(1));
pbw_.put_uint(persisted_stripe_statistics.num_rows);
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class writer::impl {
stats_dtypes(std::move(sdt)),
col_types(std::move(sct)){};

// blobs for the rowgroups and stripes. Not persisted
// blobs for the rowgroups. Not persisted
std::vector<ColStatsBlob> rowgroup_blobs;

rmm::device_uvector<statistics_chunk> stripe_stat_chunks;
Expand All @@ -322,13 +322,20 @@ class writer::impl {
{
stripe_stat_chunks.clear();
stripe_stat_merge.clear();
string_pools.clear();
stats_dtypes.clear();
col_types.clear();
num_rows = 0;
}

void persist(int num_table_rows,
bool single_write_mode,
intermediate_statistics& intermediate_stats,
rmm::cuda_stream_view stream);

std::vector<rmm::device_uvector<statistics_chunk>> stripe_stat_chunks;
std::vector<hostdevice_vector<statistics_merge_group>> stripe_stat_merge;
std::vector<rmm::device_uvector<char>> string_pools;
std::vector<statistics_dtype> stats_dtypes;
std::vector<data_type> col_types;
int num_rows = 0;
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/testing/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,11 @@ def gen_rand(dtype, size, **kwargs):
np.random.randint(low=low, high=high, size=size), unit=time_unit
)
elif dtype.kind in ("O", "U"):
return pd.util.testing.rands_array(10, size)
low = kwargs.get("low", 10)
high = kwargs.get("high", 11)
return pd.util.testing.rands_array(
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
np.random.randint(low=low, high=high, size=1)[0], size
)
raise NotImplementedError(f"dtype.kind={dtype.kind}")


Expand Down
96 changes: 96 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,102 @@ def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq):
assert stats_num_vals == actual_num_vals


@pytest.mark.parametrize("stats_freq", ["STRIPE", "ROWGROUP"])
@pytest.mark.parametrize("nrows", [2, 100, 6000000])
def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into this during a refactor, datadir and stats_freq aren't being used in this pytest. Do we want to keep them or remove them @hyperbolic2346 ?

supported_stat_types = supported_numpy_dtypes + ["str"]
# Can't write random bool columns until issue #6763 is fixed
if nrows == 6000000:
supported_stat_types.remove("bool")

gdf_fname = tmpdir.join("chunked_stats.orc")
writer = ORCWriter(gdf_fname)

max_char_length = 1000 if nrows < 10000 else 100

# Make a dataframe
gdf = cudf.DataFrame(
{
"col_"
+ str(dtype): gen_rand_series(
dtype,
int(nrows / 2),
has_nulls=True,
low=0,
high=max_char_length,
)
for dtype in supported_stat_types
}
)

pdf1 = gdf.to_pandas()
writer.write_table(gdf)
gdf = cudf.DataFrame(
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
{
"col_"
+ str(dtype): gen_rand_series(
dtype,
int(nrows / 2),
has_nulls=True,
low=0,
high=max_char_length,
)
for dtype in supported_stat_types
}
)
pdf2 = gdf.to_pandas()
writer.write_table(gdf)
writer.close()

# pandas is unable to handle min/max of string col with nulls
expect = cudf.DataFrame(pd.concat([pdf1, pdf2]).reset_index(drop=True))

# Read back written ORC's statistics
orc_file = pa.orc.ORCFile(gdf_fname)
(
file_stats,
stripes_stats,
) = cudf.io.orc.read_orc_statistics([gdf_fname])

# check file stats
for col in expect:
if "minimum" in file_stats[0][col]:
stats_min = file_stats[0][col]["minimum"]
actual_min = expect[col].min()
assert normalized_equals(actual_min, stats_min)
if "maximum" in file_stats[0][col]:
stats_max = file_stats[0][col]["maximum"]
actual_max = expect[col].max()
assert normalized_equals(actual_max, stats_max)
if "number_of_values" in file_stats[0][col]:
stats_num_vals = file_stats[0][col]["number_of_values"]
actual_num_vals = expect[col].count()
assert stats_num_vals == actual_num_vals

# compare stripe statistics with actual min/max
for stripe_idx in range(0, orc_file.nstripes):
stripe = orc_file.read_stripe(stripe_idx)
# pandas is unable to handle min/max of string col with nulls
stripe_df = cudf.DataFrame(stripe.to_pandas())
for col in stripe_df:
if "minimum" in stripes_stats[stripe_idx][col]:
actual_min = stripe_df[col].min()
stats_min = stripes_stats[stripe_idx][col]["minimum"]
assert normalized_equals(actual_min, stats_min)

if "maximum" in stripes_stats[stripe_idx][col]:
actual_max = stripe_df[col].max()
stats_max = stripes_stats[stripe_idx][col]["maximum"]
assert normalized_equals(actual_max, stats_max)

if "number_of_values" in stripes_stats[stripe_idx][col]:
stats_num_vals = stripes_stats[stripe_idx][col][
"number_of_values"
]
actual_num_vals = stripe_df[col].count()
assert stats_num_vals == actual_num_vals


@pytest.mark.parametrize("nrows", [1, 100, 6000000])
def test_orc_write_bool_statistics(tmpdir, datadir, nrows):
# Make a dataframe
Expand Down