Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nullmask offset handling in parquet and orc writer #6889

Merged
merged 15 commits into from
Dec 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t)
s->scratch_red[t] = 0xffffffffu;
} else {
uint32_t row = s->chunk.start_row + i + t * 32;
uint32_t v = (row < s->chunk.start_row + s->chunk.num_rows) ? valid_map[row >> 5] : 0;
uint32_t v = (row < s->chunk.start_row + s->chunk.num_rows)
kaatish marked this conversation as resolved.
Show resolved Hide resolved
? valid_map[(row + s->chunk.column_offset) >> 5]
kaatish marked this conversation as resolved.
Show resolved Hide resolved
: 0;
if (row & 0x1f) {
uint32_t v1 =
(row + 32 < s->chunk.start_row + s->chunk.num_rows) ? valid_map[(row >> 5) + 1] : 0;
uint32_t v1 = (row + 32 < s->chunk.start_row + s->chunk.num_rows)
kaatish marked this conversation as resolved.
Show resolved Hide resolved
? valid_map[((row + s->chunk.column_offset) >> 5) + 1]
kaatish marked this conversation as resolved.
Show resolved Hide resolved
: 0;
v = __funnelshift_r(v, v1, row & 0x1f);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devavret Shouldn't this be changed to
v = __funnelshift_r(v, v1, (row + s->chunk.column_offset) & 0x1f);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears so. Also it seems the wrapping (& 0x1f) is redundant?

}
s->scratch_red[t] = v;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <io/comp/gpuinflate.h>
#include <io/orc/orc_common.h>
#include <io/statistics/column_stats.h>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>

Expand Down Expand Up @@ -127,6 +128,7 @@ struct EncChunk {
int32_t strm_id[CI_NUM_STREAMS]; // stream id or -1 if not present
uint32_t strm_len[CI_NUM_STREAMS]; // in: max length, out: actual length
const uint32_t *valid_map_base; // base ptr of input valid bit map
size_type column_offset; // index of the first element relative to the base memory
const void *column_data_base; // base ptr of input column data
uint32_t start_row; // start row of this chunk
uint32_t num_rows; // number of rows in this chunk
Expand Down Expand Up @@ -156,6 +158,7 @@ struct StripeStream {
**/
struct DictionaryChunk {
const uint32_t *valid_map_base; // base ptr of input valid bit map
size_type column_offset; // index of the first element relative to the base memory
const void *column_data_base; // base ptr of column data (ptr,len pair)
uint32_t *dict_data; // dictionary data (index of non-null rows)
uint32_t *dict_index; // row indices of corresponding string (row from dictionary index)
Expand Down
24 changes: 20 additions & 4 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,26 @@ __global__ void __launch_bounds__(block_size)
uint8_t valid = 0;
if (row < s->chunk.valid_rows) {
if (s->chunk.valid_map_base) {
uint8_t valid_map[4];
auto const valid_map_byte_idx = row >> 3;
memcpy(valid_map, &s->chunk.valid_map_base[valid_map_byte_idx / 4], 4);
valid = valid_map[valid_map_byte_idx % 4];
// uint8_t valid_map[4];
// auto const valid_map_byte_idx = row >> 3;
// memcpy(valid_map, &s->chunk.valid_map_base[valid_map_byte_idx / 4], 4);
// valid = valid_map[valid_map_byte_idx % 4];
////////////////////////
vuule marked this conversation as resolved.
Show resolved Hide resolved

size_type current_valid_offset = row + s->chunk.column_offset;
size_type next_valid_offset = current_valid_offset + min(32, s->chunk.valid_rows);

size_type current_byte_index = current_valid_offset / 32;
size_type next_byte_index = next_valid_offset / 32;

bitmask_type current_mask_word = s->chunk.valid_map_base[current_byte_index];
bitmask_type next_mask_word = 0;
if (next_byte_index != current_byte_index) {
next_mask_word = s->chunk.valid_map_base[next_byte_index];
}
bitmask_type mask =
__funnelshift_r(current_mask_word, next_mask_word, current_valid_offset);
valid = 0xff & mask;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use existing function

__device__ bitmask_type get_mask_offset_word(bitmask_type const *__restrict__ source,
size_type destination_word_index,
size_type source_begin_bit,
size_type source_end_bit)
{
size_type source_word_index = destination_word_index + word_index(source_begin_bit);
bitmask_type curr_word = source[source_word_index];
bitmask_type next_word = 0;
if (word_index(source_end_bit) >
word_index(source_begin_bit +
destination_word_index * detail::size_in_bits<bitmask_type>())) {
next_word = source[source_word_index + 1];
}
return __funnelshift_r(curr_word, next_word, source_begin_bit);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be reused but it should be changed to an inline function and moved to a cuh file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't affect performance we can use reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the get_mask_offset_word function to column_device_view.cuh and inlined it so that it can be reused here.

} else {
valid = 0xff;
}
Expand Down
24 changes: 17 additions & 7 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ __global__ void stringdata_to_nvstrdesc(gpu::nvstrdesc_s *dst,
const size_type *offsets,
const char *strdata,
const uint32_t *nulls,
const size_type column_offset,
size_type column_size)
{
size_type row = blockIdx.x * blockDim.x + threadIdx.x;
if (row < column_size) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
uint32_t is_valid = (nulls) ? (nulls[row >> 5] >> (row & 0x1f)) & 1 : 1;
uint32_t is_valid =
(nulls) ? (nulls[(row + column_offset) >> 5] >> ((row + column_offset) & 0x1f)) & 1 : 1;
vuule marked this conversation as resolved.
Show resolved Hide resolved
kaatish marked this conversation as resolved.
Show resolved Hide resolved
size_t count;
const char *ptr;
if (is_valid) {
Expand Down Expand Up @@ -158,6 +160,7 @@ class orc_column_view {
_null_count(col.null_count()),
_data(col.head<uint8_t>() + col.offset() * _type_width),
_nulls(col.nullable() ? col.null_mask() : nullptr),
_column_offset(col.offset()),
_clockscale(to_clockscale<uint8_t>(col.type().id())),
_type_kind(to_orc_type(col.type().id()))
{
Expand All @@ -170,6 +173,7 @@ class orc_column_view {
view.offsets().data<size_type>() + view.offset(),
view.chars().data<char>(),
_nulls,
_column_offset,
_data_count);
_data = _indexes.data();

Expand Down Expand Up @@ -224,6 +228,7 @@ class orc_column_view {
bool nullable() const noexcept { return (_nulls != nullptr); }
void const *data() const noexcept { return _data; }
uint32_t const *nulls() const noexcept { return _nulls; }
size_type column_offset() const noexcept { return _column_offset; }
uint8_t clockscale() const noexcept { return _clockscale; }

void set_orc_encoding(ColumnEncodingKind e) { _encoding_kind = e; }
Expand All @@ -237,12 +242,13 @@ class orc_column_view {
size_t _str_id = 0;
bool _string_type = false;

size_t _type_width = 0;
size_t _data_count = 0;
size_t _null_count = 0;
void const *_data = nullptr;
uint32_t const *_nulls = nullptr;
uint8_t _clockscale = 0;
size_t _type_width = 0;
size_t _data_count = 0;
size_t _null_count = 0;
void const *_data = nullptr;
uint32_t const *_nulls = nullptr;
size_type _column_offset = 0;
uint8_t _clockscale = 0;

// ORC-related members
std::string _name{};
Expand Down Expand Up @@ -277,6 +283,7 @@ void writer::impl::init_dictionaries(orc_column_view *columns,
for (size_t g = 0; g < num_rowgroups; g++) {
auto *ck = &dict[g * str_col_ids.size() + i];
ck->valid_map_base = str_column.nulls();
ck->column_offset = str_column.column_offset();
ck->column_data_base = str_column.data();
ck->dict_data = dict_data + i * num_rows + g * row_index_stride_;
ck->dict_index = dict_index + i * num_rows; // Indexed by abs row
Expand Down Expand Up @@ -579,12 +586,14 @@ rmm::device_buffer writer::impl::encode_columns(orc_column_view *columns,
ck->type_kind = columns[i].orc_kind();
if (ck->type_kind == TypeKind::STRING) {
ck->valid_map_base = columns[i].nulls();
ck->column_offset = columns[i].column_offset();
ck->column_data_base = (ck->encoding_kind == DICTIONARY_V2)
? columns[i].host_stripe_dict(stripe_id)->dict_index
: columns[i].data();
ck->dtype_len = 1;
} else {
ck->valid_map_base = columns[i].nulls();
ck->column_offset = columns[i].column_offset();
ck->column_data_base = columns[i].data();
ck->dtype_len = columns[i].type_width();
}
Expand Down Expand Up @@ -760,6 +769,7 @@ std::vector<std::vector<uint8_t>> writer::impl::gather_statistic_blobs(
desc->num_rows = columns[i].data_count();
desc->num_values = columns[i].data_count();
desc->valid_map_base = columns[i].nulls();
desc->column_offset = columns[i].column_offset();
desc->column_data_base = columns[i].data();
if (desc->stats_dtype == dtype_timestamp64) {
// Timestamp statistics are in milliseconds
Expand Down
30 changes: 21 additions & 9 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,16 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment
size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->start_value_idx;

size_type validity_offset = (s->col.nesting_offsets == nullptr) ? s->col.column_offset : 0;
for (uint32_t i = 0; i < nvals; i += block_size) {
const uint32_t *valid = s->col.valid_map_base;
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.num_values)
? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1
: 0;
uint32_t is_valid =
(i + t < nvals && val_idx < s->col.num_values)
? (valid)
? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) & 1
kaatish marked this conversation as resolved.
Show resolved Hide resolved
: 1
: 0;
uint32_t valid_warp = ballot(is_valid);
uint32_t len, nz_pos, hash;
if (is_valid) {
Expand Down Expand Up @@ -963,6 +967,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages,
}
__syncthreads();

size_type validity_offset = (s->col.nesting_offsets == nullptr) ? s->col.column_offset : 0;
// Encode Repetition and Definition levels
if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.level_bits != 0 &&
s->col.nesting_levels == 0) {
Expand All @@ -983,9 +988,12 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages,
uint32_t row = s->page.start_row + rle_numvals + t;
// Definition level encodes validity. Checks the valid map and if it is valid, then sets the
// def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode
uint32_t def_lvl = (rle_numvals + t < s->page.num_rows && row < s->col.num_rows)
? (valid) ? (valid[row >> 5] >> (row & 0x1f)) & 1 : 1
: 0;
uint32_t def_lvl =
(rle_numvals + t < s->page.num_rows && row < s->col.num_rows)
? (valid)
vuule marked this conversation as resolved.
Show resolved Hide resolved
? (valid[(row + validity_offset) >> 5] >> ((row + validity_offset) & 0x1f)) & 1
: 1
: 0;
s->vals[(rle_numvals + t) & (rle_buffer_size - 1)] = def_lvl;
__syncthreads();
rle_numvals += nrows;
Expand Down Expand Up @@ -1082,9 +1090,13 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages,
val_idx = (is_valid) ? s->col.dict_data[val_idx] : val_idx;
} else {
const uint32_t *valid = s->col.valid_map_base;
is_valid = (val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values)
? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1
: 0;
is_valid =
(val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values)
? (valid)
vuule marked this conversation as resolved.
Show resolved Hide resolved
? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) &
1
: 1
: 0;
}
warp_valids = ballot(is_valid);
cur_val_idx += nvals;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class parquet_column_view {
_null_count(_leaf_col.null_count()),
_data(col.head<uint8_t>() + col.offset() * _type_width),
_nulls(_leaf_col.nullable() ? _leaf_col.null_mask() : nullptr),
_offset(col.offset()),
_converted_type(ConvertedType::UNKNOWN),
_ts_scale(0),
_dremel_offsets(0, stream),
Expand Down Expand Up @@ -323,6 +324,7 @@ class parquet_column_view {
bool nullable() const noexcept { return (_nulls != nullptr); }
void const *data() const noexcept { return _data; }
uint32_t const *nulls() const noexcept { return _nulls; }
size_type offset() const noexcept { return _offset; }

// List related data
column_view cudf_col() const noexcept { return _col; }
Expand Down Expand Up @@ -396,6 +398,7 @@ class parquet_column_view {
size_t _null_count = 0;
void const *_data = nullptr;
uint32_t const *_nulls = nullptr;
size_type _offset = 0;

// parquet-related members
std::string _name{};
Expand Down Expand Up @@ -797,6 +800,7 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state)
*desc = gpu::EncColumnDesc{}; // Zero out all fields
desc->column_data_base = col.data();
desc->valid_map_base = col.nulls();
desc->column_offset = col.offset();
desc->stats_dtype = col.stats_type();
desc->ts_scale = col.ts_scale();
// TODO (dm): Enable dictionary for list after refactor
Expand Down
15 changes: 12 additions & 3 deletions cpp/src/io/statistics/column_stats.cu
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora
uint32_t row = r + s->group.start_row;
const uint32_t *valid_map = s->col.valid_map_base;
uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values)
? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1
? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >>
((row + s->col.column_offset) & 0x1f)) &
1
: 1
: 0;
vuule marked this conversation as resolved.
Show resolved Hide resolved
if (is_valid) {
switch (dtype) {
Expand Down Expand Up @@ -251,7 +254,10 @@ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Sto
uint32_t row = r + s->group.start_row;
const uint32_t *valid_map = s->col.valid_map_base;
uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values)
? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1
? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >>
((row + s->col.column_offset) & 0x1f)) &
1
: 1
: 0;
if (is_valid) {
if (dtype == dtype_float64) {
Expand Down Expand Up @@ -331,7 +337,10 @@ void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &s
uint32_t row = r + s->group.start_row;
const uint32_t *valid_map = s->col.valid_map_base;
uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values)
? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1
? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >>
((row + s->col.column_offset) & 0x1f)) &
1
: 1
: 0;
if (is_valid) {
const nvstrdesc_s *str_col = static_cast<const nvstrdesc_s *>(s->col.column_data_base);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/statistics/column_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once
#include <stdint.h>

#include <cudf/types.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand Down Expand Up @@ -43,6 +44,7 @@ struct stats_column_desc {
uint32_t num_values; //!< Number of data values in column. Different from num_rows in case of
//!< nested columns
const uint32_t *valid_map_base; //!< base of valid bit map for this column (null if not present)
size_type column_offset; //! < index of the first element relative to the base memory
const void *column_data_base; //!< base ptr to column data
int32_t ts_scale; //!< timestamp scale (>0: multiply by scale, <0: divide by -scale)
};
Expand Down
35 changes: 28 additions & 7 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/concatenate.hpp>
Expand Down Expand Up @@ -142,13 +143,13 @@ inline auto random_values(size_t size)
}

// Helper function to compare two tables
void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs)
{
EXPECT_EQ(lhs.num_columns(), rhs.num_columns());
auto expected = lhs.begin();
auto result = rhs.begin();
while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); }
}
// void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs)
//{
// EXPECT_EQ(lhs.num_columns(), rhs.num_columns());
// auto expected = lhs.begin();
// auto result = rhs.begin();
// while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); }
//}
vuule marked this conversation as resolved.
Show resolved Hide resolved

struct SkipRowTest {
int test_calls;
Expand Down Expand Up @@ -617,6 +618,26 @@ TEST_F(OrcWriterTest, negTimestampsNano)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view());
}

TEST_F(OrcWriterTest, Slice)
{
auto col =
cudf::test::fixed_width_column_wrapper<int>{{1, 2, 3, 4, 5}, {true, true, true, false, true}};
std::vector<cudf::size_type> indices{2, 5};
std::vector<cudf::column_view> result = cudf::slice(col, indices);
cudf::table_view tbl{{result[0]}};

auto filepath = temp_env->get_temp_filepath("Slice.orc");
cudf_io::orc_writer_options out_opts =
cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, tbl);
cudf_io::write_orc(out_opts);

cudf_io::orc_reader_options in_opts =
cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath});
auto read_table = cudf_io::read_orc(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUIVALENT(read_table.tbl->view(), tbl);
}

TEST_F(OrcChunkedWriterTest, SingleTable)
{
srand(31337);
Expand Down
21 changes: 21 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ TEST_F(ParquetWriterTest, SlicedTable)
expected_metadata.column_names.emplace_back("col_list");
expected_metadata.column_names.emplace_back("col_multi_level_list");

// auto expected = table_view({col0, col1, col2, col3});
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto expected = table_view({col0, col1, col2, col3, col4});

auto expected_slice = cudf::slice(expected, {2, static_cast<cudf::size_type>(num_rows) - 1});
Expand Down Expand Up @@ -816,6 +817,26 @@ TEST_F(ParquetWriterTest, MultipleMismatchedSources)
}
}

TEST_F(ParquetWriterTest, Slice)
{
auto col =
cudf::test::fixed_width_column_wrapper<int>{{1, 2, 3, 4, 5}, {true, true, true, false, true}};
std::vector<cudf::size_type> indices{2, 5};
std::vector<cudf::column_view> result = cudf::slice(col, indices);
cudf::table_view tbl{{result[0]}};
vuule marked this conversation as resolved.
Show resolved Hide resolved

auto filepath = temp_env->get_temp_filepath("Slice.parquet");
cudf_io::parquet_writer_options out_opts =
cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl);
cudf_io::write_parquet(out_opts);

cudf_io::parquet_reader_options in_opts =
cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath});
auto read_table = cudf_io::read_parquet(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUIVALENT(read_table.tbl->view(), tbl);
}

TEST_F(ParquetChunkedWriterTest, SingleTable)
{
srand(31337);
Expand Down