Skip to content

Commit

Permalink
Fixes issue with null struct columns in ORC reader (#8819)
Browse files Browse the repository at this point in the history
In case of liborc, pyarrow and pyorc:
If the parent has a null element, that element is skipped while writing child data, and same goes with mask
So, you would have to keep track of null count and null mask in parent column, so that you can merge both the parent and child null masks.

In case of pyspark, spark:

If the parent has a null element, and if child column also has null element, then upper explanation holds.
But if all the child rows are valid, then you need to copy the mask from parent.

These scenarios have been take care in the code changes.

Earlier struct  column and its child columns used to be in the same level of nesting, but since we need parent null mask before decoding child,  changes have been made so that child columns will be moved one level down for all types of nested columns. 

closes #8704

Authors:
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Devavret Makkar (https://github.com/devavret)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #8819
  • Loading branch information
rgsl888prabhu authored Jul 22, 2021
1 parent 3d0583c commit 825f132
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 150 deletions.
14 changes: 9 additions & 5 deletions cpp/src/io/orc/orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,7 @@ class OrcDecompressor {
};

/**
* @brief Stores orc id for each column and its adjacent number of children
* in case of struct or number of children in case of list column.
* If list column has struct column, then all child columns of that struct are treated as child
* column of list.
* @brief Stores orc id for each column and number of children in that column.
*
* @code{.pseudo}
* Consider following data where a struct has two members and a list column
Expand All @@ -560,11 +557,18 @@ class OrcDecompressor {
*
*/
struct orc_column_meta {
// orc_column_meta(uint32_t _id, uint32_t _num_children) : id(_id), num_children(_num_children){};
uint32_t id; // orc id for the column
uint32_t num_children; // number of children at the same level of nesting in case of struct
};

/**
* @brief Stores column's validity map and null count
*/
struct column_validity_info {
uint32_t* valid_map_base;
uint32_t null_count;
};

/**
* @brief A helper class for ORC file metadata. Provides some additional
* convenience methods for initializing and accessing metadata.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ struct ColumnDesc {
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
};

/**
Expand Down
293 changes: 203 additions & 90 deletions cpp/src/io/orc/reader_impl.cu

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ class aggregate_orc_metadata;
*/
struct reader_column_meta {
std::vector<std::vector<int32_t>>
orc_col_map; // Mapping between column id in orc to processing order.
std::vector<uint32_t> num_child_rows; // number of rows in child columns
orc_col_map; // Mapping between column id in orc to processing order.
std::vector<uint32_t> num_child_rows; // number of rows in child columns

std::vector<column_validity_info>
parent_column_data; // consists of parent column valid_map and null count

std::vector<uint32_t> child_start_row; // start row of child columns [stripe][column]
std::vector<uint32_t>
num_child_rows_per_stripe; // number of rows of child columns [stripe][column]
Expand Down Expand Up @@ -151,12 +155,14 @@ class reader::impl {
* @brief Aggregate child metadata from parent column chunks.
*
* @param chunks Vector of list of parent column chunks.
* @param chunks Vector of list of parent column row groups.
* @param row_groups Vector of list of row index descriptors
* @param out_buffers Column buffers for columns.
* @param list_col Vector of column metadata of list type parent columns.
* @param level Current nesting level being processed.
*/
void aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDesc> chunks,
cudf::detail::host_2dspan<gpu::RowGroup> row_groups,
std::vector<column_buffer>& out_buffers,
std::vector<orc_column_meta> const& list_col,
const int32_t level);

Expand Down Expand Up @@ -207,7 +213,7 @@ class reader::impl {
bool _use_index = true;
bool _use_np_dtypes = true;
bool _has_timestamp_column = false;
bool _has_list_column = false;
bool _has_nested_column = false;
std::vector<std::string> _decimal_cols_as_float;
data_type _timestamp_type{type_id::EMPTY};
reader_column_meta _col_meta;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ __global__ void __launch_bounds__(block_size)

if (t == 0) s->chunk = chunks[chunk_id];
__syncthreads();
const size_t max_num_rows = s->chunk.column_num_rows;
const size_t max_num_rows = s->chunk.column_num_rows - s->chunk.parent_validity_info.null_count;

if (is_nulldec) {
uint32_t null_count = 0;
Expand Down Expand Up @@ -1186,6 +1186,7 @@ __global__ void __launch_bounds__(block_size)
nrows = nrows_max;
}
__syncthreads();

row_in = s->chunk.start_row + s->top.nulls_desc_row;
if (row_in + nrows > first_row && row_in < first_row + max_num_rows &&
s->chunk.valid_map_base != NULL) {
Expand Down Expand Up @@ -1334,7 +1335,7 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s,
s->top.data.cur_row + s->top.data.nrows < s->top.data.end_row) {
uint32_t nrows = min(s->top.data.end_row - (s->top.data.cur_row + s->top.data.nrows),
min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x));
if (s->chunk.strm_len[CI_PRESENT] > 0) {
if (s->chunk.valid_map_base != NULL) {
// We have a present stream
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
uint32_t r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,10 @@ extern "C" __global__ void __launch_bounds__(128, 8)
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =
((volatile uint32_t*)&s->rowgroups[i])[j];
}
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
// Updating in case of struct
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_child_rows = num_rows;
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row = start_row;
}
__syncthreads();
if (t == 0) { s->rowgroup_start += num_rowgroups; }
Expand Down
Binary file not shown.
107 changes: 60 additions & 47 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,9 +843,10 @@ def test_orc_string_stream_offset_issue():
assert_eq(df, cudf.read_orc(buffer))


# Data is generated using pyorc module
def generate_list_struct_buff(size=28000):
rd = random.Random(0)
np.random.seed(seed=0)
rd = random.Random(1)
np.random.seed(seed=1)

buff = BytesIO()

Expand Down Expand Up @@ -875,16 +876,21 @@ def generate_list_struct_buff(size=28000):
schema = po.Struct(**schema)

lvl3_list = [
[
rd.choice(
[
None,
[
rd.choice([None, np.random.randint(1, 3)])
for z in range(np.random.randint(1, 3))
]
for z in range(np.random.randint(0, 3))
[
[
rd.choice([None, np.random.randint(1, 3)])
for z in range(np.random.randint(1, 3))
]
for z in range(np.random.randint(0, 3))
]
for y in range(np.random.randint(0, 3))
],
]
for y in range(np.random.randint(0, 3))
]
)
for x in range(size)
]
lvl1_list = [
Expand All @@ -895,15 +901,21 @@ def generate_list_struct_buff(size=28000):
for x in range(size)
]
lvl1_struct = [
(np.random.randint(0, 3), np.random.randint(0, 3)) for x in range(size)
rd.choice([None, (np.random.randint(0, 3), np.random.randint(0, 3))])
for x in range(size)
]
lvl2_struct = [
(
rd.choice([None, np.random.randint(0, 3)]),
(
rd.choice([None, np.random.randint(0, 3)]),
np.random.randint(0, 3),
),
rd.choice(
[
None,
(
rd.choice([None, np.random.randint(0, 3)]),
(
rd.choice([None, np.random.randint(0, 3)]),
np.random.randint(0, 3),
),
),
]
)
for x in range(size)
]
Expand Down Expand Up @@ -953,47 +965,48 @@ def generate_list_struct_buff(size=28000):
)
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000])
@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("skip_rows", [0, 101, 1007, 27000])
def test_lists_struct_nests(
columns, num_rows, use_index, skip_rows,
columns, num_rows, use_index,
):

has_lists = (
any("list" in col_name for col_name in columns) if columns else True
gdf = cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
)

if has_lists and skip_rows > 0:
with pytest.raises(
RuntimeError, match="skip_rows is not supported by list column"
):
cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
skiprows=skip_rows,
)
pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read()

pyarrow_tbl = (
pyarrow_tbl[:num_rows]
if columns is None
else pyarrow_tbl.select(columns)[:num_rows]
)

if num_rows > 0:
assert pyarrow_tbl.equals(gdf.to_arrow())
else:
gdf = cudf.read_orc(
list_struct_buff,
columns=columns,
num_rows=num_rows,
use_index=use_index,
skiprows=skip_rows,
)
assert_eq(pyarrow_tbl.to_pandas(), gdf)

pyarrow_tbl = pyarrow.orc.ORCFile(list_struct_buff).read()

pyarrow_tbl = (
pyarrow_tbl[skip_rows : skip_rows + num_rows]
if columns is None
else pyarrow_tbl.select(columns)[skip_rows : skip_rows + num_rows]
@pytest.mark.parametrize("columns", [None, ["lvl1_struct"], ["lvl1_list"]])
def test_skip_rows_for_nested_types(columns):
with pytest.raises(
RuntimeError, match="skip_rows is not supported by nested column"
):
cudf.read_orc(
list_struct_buff, columns=columns, use_index=True, skiprows=5,
)

if num_rows > 0:
assert_eq(True, pyarrow_tbl.equals(gdf.to_arrow()))
else:
assert_eq(pyarrow_tbl.to_pandas(), gdf)

def test_pyspark_struct(datadir):
path = datadir / "TestOrcFile.testPySparkStruct.orc"

pdf = pa.orc.ORCFile(path).read().to_pandas()
gdf = cudf.read_orc(path)

assert_eq(pdf, gdf)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 825f132

Please sign in to comment.