Skip to content

Commit

Permalink
Add support for decimal types in ORC writer (#8198)
Browse files Browse the repository at this point in the history
Closes #8159, #7126

Current implementation uses an array to hold the exact size of each encoded element before the encode step. This allows us to simplify the encoding (each element encode is independent) and to allocate streams of exact size instead of the worst-case. The process is different from other types because decimal data streams do not use RLE encoding.

Will add benchmarks once data generator can produce decimal data.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Michael Wang (https://github.com/isVoid)
  - Devavret Makkar (https://github.com/devavret)
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

URL: #8198
  • Loading branch information
vuule authored May 18, 2021
1 parent bbce6bc commit 72e017b
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 59 deletions.
6 changes: 3 additions & 3 deletions cpp/src/io/orc/orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk,
if (data_blk >= 0) { sz += put_uint(data_blk); }
if (data_ofs >= 0) {
sz += put_uint(data_ofs);
if (kind != STRING && kind != FLOAT && kind != DOUBLE) {
if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) {
putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries)
sz++;
if (kind == BOOLEAN) {
Expand Down Expand Up @@ -293,8 +293,8 @@ size_t ProtobufWriter::write(const SchemaType &s)
w.field_packed_uint(2, s.subtypes);
w.field_repeated_string(3, s.fieldNames);
// w.field_uint(4, s.maximumLength);
// w.field_uint(5, s.precision);
// w.field_uint(6, s.scale);
if (s.precision) w.field_uint(5, *s.precision);
if (s.scale) w.field_uint(6, *s.scale);
return w.value();
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/orc/orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ struct SchemaType {
TypeKind kind = INVALID_TYPE_KIND; // the kind of this type
std::vector<uint32_t> subtypes; // the type ids of any subcolumns for list, map, struct, or union
std::vector<std::string> fieldNames; // the list of field names for struct
uint32_t maximumLength =
0; // optional: the maximum length of the type for varchar or char in UTF-8 characters
uint32_t precision = 0; // optional: the precision and scale for decimal
uint32_t scale = 0;
std::optional<uint32_t>
maximumLength; // the maximum length of the type for varchar or char in UTF-8 characters
std::optional<uint32_t> precision; // the precision for decimal
std::optional<uint32_t> scale; // the scale for decimal
};

struct UserMetadataItem {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ struct ColumnDesc {
uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind)
uint8_t type_kind; // column data type (orc::TypeKind)
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
uint8_t decimal_scale; // number of fractional decimal digits for decimal type (bit 7 set if
// converting to float64)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
};

Expand All @@ -122,9 +121,10 @@ struct EncChunk {
uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind)
uint8_t type_kind; // column data type (orc::TypeKind)
uint8_t dtype_len; // data type length
uint8_t scale; // scale for decimals or timestamps
int32_t scale; // scale for decimals or timestamps

uint32_t *dict_index; // dictionary index from row index
device_span<uint32_t> decimal_offsets;
column_device_view *leaf_column;
};

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// sign of the scale is changed since cuDF follows c++ libraries like CNL
// which uses negative scaling, but liborc and other libraries
// follow positive scaling.
auto const scale = -static_cast<int32_t>(_metadata->ff.types[col].scale);
auto const scale = -static_cast<int32_t>(_metadata->ff.types[col].scale.value_or(0));
column_types.emplace_back(col_type, scale);
} else {
column_types.emplace_back(col_type);
Expand Down Expand Up @@ -526,7 +526,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
chunk.num_rows = stripe_info->numberOfRows;
chunk.encoding_kind = stripe_footer->columns[_selected_columns[j]].kind;
chunk.type_kind = _metadata->ff.types[_selected_columns[j]].kind;
chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale;
chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale.value_or(0);
chunk.rowgroup_id = num_rowgroups;
chunk.dtype_len = (column_types[j].id() == type_id::STRING)
? sizeof(std::pair<const char *, size_t>)
Expand Down
21 changes: 19 additions & 2 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ __global__ void __launch_bounds__(block_size)
s->lengths.u32[nz_idx] = value.size_bytes();
}
break;
// Reusing the lengths array for the scale stream
// Note: can be written in a faster manner, given that all values are equal
case DECIMAL: s->lengths.u32[nz_idx] = zigzag(s->chunk.scale); break;
default: break;
}
}
Expand Down Expand Up @@ -814,7 +817,7 @@ __global__ void __launch_bounds__(block_size)
uint32_t nz = s->buf.u32[511];
s->nnz += nz;
s->numvals += nz;
s->numlengths += (s->chunk.type_kind == TIMESTAMP ||
s->numlengths += (s->chunk.type_kind == TIMESTAMP || s->chunk.type_kind == DECIMAL ||
(s->chunk.type_kind == STRING && s->chunk.encoding_kind != DICTIONARY_V2))
? nz
: 0;
Expand Down Expand Up @@ -865,6 +868,17 @@ __global__ void __launch_bounds__(block_size)
n = s->numvals;
}
break;
case DECIMAL: {
if (valid) {
uint64_t const zz_val = (s->chunk.leaf_column->type().id() == type_id::DECIMAL32)
? zigzag(s->chunk.leaf_column->element<int32_t>(row))
: zigzag(s->chunk.leaf_column->element<int64_t>(row));
auto const offset =
(row == s->chunk.start_row) ? 0 : s->chunk.decimal_offsets[row - 1];
StoreVarint(s->stream.data_ptrs[CI_DATA] + offset, zz_val);
}
n = s->numvals;
} break;
default: n = s->numvals; break;
}
__syncthreads();
Expand All @@ -878,6 +892,7 @@ __global__ void __launch_bounds__(block_size)
n = IntegerRLE<CI_DATA2, uint64_t, false, 0x3ff, block_size>(
s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64);
break;
case DECIMAL:
case STRING:
n = IntegerRLE<CI_DATA2, uint32_t, false, 0x3ff, block_size>(
s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32);
Expand All @@ -893,7 +908,9 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
if (t <= CI_PRESENT && s->stream.ids[t] >= 0) {
// Update actual compressed length
streams[col_id][group_id].lengths[t] = s->strm_pos[t];
// (not needed for decimal data, whose exact size is known before encode)
if (!(t == CI_DATA && s->chunk.type_kind == DECIMAL))
streams[col_id][group_id].lengths[t] = s->strm_pos[t];
if (!s->stream.data_ptrs[t]) {
streams[col_id][group_id].data_ptrs[t] =
static_cast<uint8_t *>(const_cast<void *>(s->chunk.leaf_column->head())) +
Expand Down
Loading

0 comments on commit 72e017b

Please sign in to comment.