Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes issue with null struct columns in ORC reader #8819

Merged
merged 17 commits into from
Jul 22, 2021
Merged
14 changes: 9 additions & 5 deletions cpp/src/io/orc/orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,7 @@ class OrcDecompressor {
};

/**
* @brief Stores orc id for each column and its adjacent number of children
* in case of struct or number of children in case of list column.
* If list column has struct column, then all child columns of that struct are treated as child
* column of list.
* @brief Stores orc id for each column and number of children in that column.
*
* @code{.pseudo}
* Consider following data where a struct has two members and a list column
Expand All @@ -559,11 +556,18 @@ class OrcDecompressor {
*
*/
struct orc_column_meta {
// orc_column_meta(uint32_t _id, uint32_t _num_children) : id(_id), num_children(_num_children){};
uint32_t id; // orc id for the column
uint32_t num_children; // number of children at the same level of nesting in case of struct
};

/**
* @brief Stores column's validity map and null count
*/
struct column_validity_info {
uint32_t* valid_map_base;
uint32_t null_count;
};

/**
* @brief A helper class for ORC file metadata. Provides some additional
* convenience methods for initializing and accessing metadata.
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/utilities/span.hpp>
#include <io/statistics/statistics.cuh>
#include <io/utilities/column_buffer.hpp>
#include "orc.h"
#include "orc_common.h"

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -109,6 +110,7 @@ struct ColumnDesc {
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
};

/**
Expand Down
293 changes: 203 additions & 90 deletions cpp/src/io/orc/reader_impl.cu

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ class aggregate_orc_metadata;
*/
struct reader_column_meta {
std::vector<std::vector<int32_t>>
orc_col_map; // Mapping between column id in orc to processing order.
std::vector<uint32_t> num_child_rows; // number of rows in child columns
orc_col_map; // Mapping between column id in orc to processing order.
std::vector<uint32_t> num_child_rows; // number of rows in child columns

std::vector<column_validity_info>
parent_column_data; // consists of parent column valid_map and null count

std::vector<uint32_t> child_start_row; // start row of child columns [stripe][column]
std::vector<uint32_t>
num_child_rows_per_stripe; // number of rows of child columns [stripe][column]
Expand Down Expand Up @@ -151,12 +155,14 @@ class reader::impl {
* @brief Aggregate child metadata from parent column chunks.
*
* @param chunks Vector of list of parent column chunks.
* @param chunks Vector of list of parent column row groups.
* @param row_groups Vector of list of row index descriptors
* @param out_buffers Column buffers for columns.
* @param list_col Vector of column metadata of list type parent columns.
* @param level Current nesting level being processed.
*/
void aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDesc> chunks,
cudf::detail::host_2dspan<gpu::RowGroup> row_groups,
std::vector<column_buffer>& out_buffers,
std::vector<orc_column_meta> const& list_col,
const int32_t level);

Expand Down Expand Up @@ -207,7 +213,7 @@ class reader::impl {
bool _use_index = true;
bool _use_np_dtypes = true;
bool _has_timestamp_column = false;
bool _has_list_column = false;
bool _has_nested_column = false;
std::vector<std::string> _decimal_cols_as_float;
data_type _timestamp_type{type_id::EMPTY};
reader_column_meta _col_meta;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ __global__ void __launch_bounds__(block_size)

if (t == 0) s->chunk = chunks[chunk_id];
__syncthreads();
const size_t max_num_rows = s->chunk.column_num_rows;
const size_t max_num_rows = s->chunk.column_num_rows - s->chunk.parent_validity_info.null_count;

if (is_nulldec) {
uint32_t null_count = 0;
Expand Down Expand Up @@ -1186,6 +1186,7 @@ __global__ void __launch_bounds__(block_size)
nrows = nrows_max;
}
__syncthreads();

row_in = s->chunk.start_row + s->top.nulls_desc_row;
if (row_in + nrows > first_row && row_in < first_row + max_num_rows &&
s->chunk.valid_map_base != NULL) {
Expand Down Expand Up @@ -1334,7 +1335,7 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s,
s->top.data.cur_row + s->top.data.nrows < s->top.data.end_row) {
uint32_t nrows = min(s->top.data.end_row - (s->top.data.cur_row + s->top.data.nrows),
min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x));
if (s->chunk.strm_len[CI_PRESENT] > 0) {
if (s->chunk.valid_map_base != NULL) {
// We have a present stream
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
uint32_t r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,10 @@ extern "C" __global__ void __launch_bounds__(128, 8)
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =
((volatile uint32_t*)&s->rowgroups[i])[j];
}
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
// Updating in case of struct
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_child_rows = num_rows;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row;
}
__syncthreads();
if (t == 0) { s->rowgroup_start += num_rowgroups; }
Expand Down
Binary file not shown.
107 changes: 60 additions & 47 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,9 +843,10 @@ def test_orc_string_stream_offset_issue():
assert_eq(df, cudf.read_orc(buffer))


# Data is generated using pyorc module
def generate_list_struct_buff(size=28000):
rgsl888prabhu marked this conversation as resolved.
Show resolved Hide resolved
rd = random.Random(0)
np.random.seed(seed=0)
rd = random.Random(1)
vuule marked this conversation as resolved.
Show resolved Hide resolved
np.random.seed(seed=1)

buff = BytesIO()

Expand Down Expand Up @@ -875,16 +876,21 @@ def generate_list_struct_buff(size=28000):
schema = po.Struct(**schema)

lvl3_list = [
[
rd.choice(
[
None,
[
rd.choice([None, np.random.randint(1, 3)])
for z in range(np.random.randint(1, 3))
]
for z in range(np.random.randint(0, 3))
[
[
rd.choice([None, np.random.randint(1, 3)])
for z in range(np.random.randint(1, 3))
]
for z in range(np.random.randint(0, 3))
]
for y in range(np.random.randint(0, 3))
],
]
for y in range(np.random.randint(0, 3))
]
)
for x in range(size)
]
lvl1_list = [
Expand All @@ -895,15 +901,21 @@ def generate_list_struct_buff(size=28000):
for x in range(size)
]
lvl1_struct = [
(np.random.randint(0, 3), np.random.randint(0, 3)) for x in range(size)
rd.choice([None, (np.random.randint(0, 3), np.random.randint(0, 3))])
for x in range(size)
]
lvl2_struct = [
(
rd.choice([None, np.random.randint(0, 3)]),
(
rd.choice([None, np.random.randint(0, 3)]),
np.random.randint(0, 3),
),
rd.choice(
[
None,
(
rd.choice([None, np.random.randint(0, 3)]),
(
rd.choice([None, np.random.randint(0, 3)]),
np.random.randint(0, 3),
),
),
]
)
for x in range(size)
]
Expand Down Expand Up @@ -953,47 +965,48 @@ def generate_list_struct_buff(size=28000):
)
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000])
@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("skip_rows", [0, 101, 1007, 27000])
def test_lists_struct_nests(
columns, num_rows, use_index, skip_rows,
columns, num_rows, use_index,
):

has_lists = (
any("list" in col_name for col_name in columns) if columns else True
gdf = cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
)

if has_lists and skip_rows > 0:
with pytest.raises(
RuntimeError, match="skip_rows is not supported by list column"
):
cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
skiprows=skip_rows,
)
pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read()

pyarrow_tbl = (
pyarrow_tbl[:num_rows]
if columns is None
else pyarrow_tbl.select(columns)[:num_rows]
)

if num_rows > 0:
assert pyarrow_tbl.equals(gdf.to_arrow())
else:
gdf = cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
skiprows=skip_rows,
)
assert_eq(pyarrow_tbl.to_pandas(), gdf)

pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read()

pyarrow_tbl = (
pyarrow_tbl[skip_rows : skip_rows + num_rows]
if columns is None
else pyarrow_tbl.select(columns)[skip_rows : skip_rows + num_rows]
@pytest.mark.parametrize("columns", [None, ["lvl1_struct"], ["lvl1_list"]])
def test_skip_rows_for_nested_types(columns):
with pytest.raises(
RuntimeError, match="skip_rows is not supported by nested column"
):
cudf.read_orc(
list_struct_buff, columns=columns, use_index=True, skiprows=5,
)

if num_rows > 0:
assert_eq(True, pyarrow_tbl.equals(gdf.to_arrow()))
else:
assert_eq(pyarrow_tbl.to_pandas(), gdf)

def test_pyspark_struct(datadir):
path = datadir / "TestOrcFile.testPySparkStruct.orc"

pdf = pa.orc.ORCFile(path).read().to_pandas()
gdf = cudf.read_orc(path)

assert_eq(pdf, gdf)


@pytest.mark.parametrize(
Expand Down