Skip to content

Commit

Permalink
Fix ORC issue with incorrect timestamp nanosecond values (rapidsai#7581)
Browse files Browse the repository at this point in the history
Closes rapidsai#7355

Use 64 bit variables/buffers to handle nanosecond values since nanosecond encode can overflow a 32bit value in some cases.
Removed the overloaded `intrle_minmax` function, using templated `numeric_limits` functions instead (the alternative was to add another overload).

Performance impact evaluation pending, but this fix seems unavoidable regardless of the impact.

Authors:
  - Vukasin Milovanovic (@vuule)

Approvers:
  - GALI PREM SAGAR (@galipremsagar)
  - Devavret Makkar (@devavret)
  - Kumar Aatish (@kaatish)

URL: rapidsai#7581
  • Loading branch information
vuule authored and hyperbolic2346 committed Mar 23, 2021
1 parent a7ff744 commit 6564581
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 35 deletions.
34 changes: 25 additions & 9 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1455,8 +1455,9 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
// Decode data streams
{
uint32_t numvals = s->top.data.max_vals, secondary_val;
uint32_t vals_skipped = 0;
uint32_t numvals = s->top.data.max_vals;
uint64_t secondary_val = 0;
uint32_t vals_skipped = 0;
if (s->is_string || s->chunk.type_kind == TIMESTAMP) {
// For these data types, we have a secondary unsigned 32-bit data stream
orc_bytestream_s *bs = (is_dictionary(s->chunk.encoding_kind)) ? &s->bs : &s->bs2;
Expand All @@ -1471,9 +1472,15 @@ __global__ void __launch_bounds__(block_size)
}
if (numvals > ofs) {
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t);
if (s->chunk.type_kind == TIMESTAMP)
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u64[ofs], numvals - ofs, t);
else
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t);
} else {
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t);
if (s->chunk.type_kind == TIMESTAMP)
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u64[ofs], numvals - ofs, t);
else
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t);
}
__syncthreads();
if (numvals <= ofs && t >= ofs && t < s->top.data.max_vals) { s->vals.u32[t] = 0; }
Expand All @@ -1487,15 +1494,24 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
if (t == 0) { s->top.data.index.run_pos[cid] = 0; }
numvals -= vals_skipped;
if (t < numvals) { secondary_val = s->vals.u32[vals_skipped + t]; }
if (t < numvals) {
secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[vals_skipped + t]
: s->vals.u32[vals_skipped + t];
}
__syncthreads();
if (t < numvals) { s->vals.u32[t] = secondary_val; }
if (t < numvals) {
if (s->chunk.type_kind == TIMESTAMP)
s->vals.u64[t] = secondary_val;
else
s->vals.u32[t] = secondary_val;
}
}
}
__syncthreads();
// For strings with direct encoding, we need to convert the lengths into an offset
if (!is_dictionary(s->chunk.encoding_kind)) {
secondary_val = (t < numvals) ? s->vals.u32[t] : 0;
if (t < numvals)
secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[t] : s->vals.u32[t];
if (s->chunk.type_kind != TIMESTAMP) {
lengths_to_positions(s->vals.u32, numvals, t);
__syncthreads();
Expand Down Expand Up @@ -1693,7 +1709,7 @@ __global__ void __launch_bounds__(block_size)
}
case TIMESTAMP: {
int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch;
uint32_t nanos = secondary_val;
uint64_t nanos = secondary_val;
nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7];
if (!tz_table.ttimes.empty()) {
seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds);
Expand All @@ -1716,7 +1732,7 @@ __global__ void __launch_bounds__(block_size)
if (s->chunk.type_kind == TIMESTAMP) {
int buffer_pos = s->top.data.max_vals;
if (t >= buffer_pos && t < buffer_pos + s->top.data.buffered_count) {
s->vals.u32[t - buffer_pos] = secondary_val;
s->vals.u64[t - buffer_pos] = secondary_val;
}
} else if (s->chunk.type_kind == BOOLEAN && t < s->top.data.buffered_count) {
s->vals.u8[t] = secondary_val;
Expand Down
36 changes: 10 additions & 26 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct orcenc_state_s {
union {
uint8_t u8[2048];
uint32_t u32[1024];
uint64_t u64[1024];
} lengths;
};

Expand All @@ -101,6 +102,7 @@ static inline __device__ uint32_t zigzag(int32_t v)
int32_t s = (v >> 31);
return ((v ^ s) * 2) - s;
}
static inline __device__ uint64_t zigzag(uint64_t v) { return v; }
static inline __device__ uint64_t zigzag(int64_t v)
{
int64_t s = (v < 0) ? 1 : 0;
Expand Down Expand Up @@ -286,24 +288,6 @@ static inline __device__ uint32_t StoreVarint(uint8_t *dst, uint64_t v)
return bytecnt;
}

static inline __device__ void intrle_minmax(int64_t &vmin, int64_t &vmax)
{
vmin = INT64_MIN;
vmax = INT64_MAX;
}
// static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) { vmin = UINT64_C(0);
// vmax = UINT64_MAX; }
static inline __device__ void intrle_minmax(int32_t &vmin, int32_t &vmax)
{
vmin = INT32_MIN;
vmax = INT32_MAX;
}
static inline __device__ void intrle_minmax(uint32_t &vmin, uint32_t &vmax)
{
vmin = UINT32_C(0);
vmax = UINT32_MAX;
}

template <class T>
static inline __device__ void StoreBytesBigEndian(uint8_t *dst, T v, uint32_t w)
{
Expand Down Expand Up @@ -412,13 +396,9 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s,
// Find minimum and maximum values
if (literal_run > 0) {
// Find min & max
T vmin, vmax;
T vmin = (t < literal_run) ? v0 : std::numeric_limits<T>::max();
T vmax = (t < literal_run) ? v0 : std::numeric_limits<T>::min();
uint32_t literal_mode, literal_w;
if (t < literal_run) {
vmin = vmax = v0;
} else {
intrle_minmax(vmax, vmin);
}
vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min());
__syncthreads();
vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max());
Expand Down Expand Up @@ -652,6 +632,7 @@ __global__ void __launch_bounds__(block_size)
typename cub::BlockReduce<int32_t, block_size>::TempStorage i32;
typename cub::BlockReduce<int64_t, block_size>::TempStorage i64;
typename cub::BlockReduce<uint32_t, block_size>::TempStorage u32;
typename cub::BlockReduce<uint64_t, block_size>::TempStorage u64;
} temp_storage;

orcenc_state_s *const s = &state_g;
Expand Down Expand Up @@ -763,7 +744,7 @@ __global__ void __launch_bounds__(block_size)
int64_t ts = static_cast<const int64_t *>(base)[row];
int32_t ts_scale = kTimeScale[min(s->chunk.scale, 9)];
int64_t seconds = ts / ts_scale;
int32_t nanos = (ts - seconds * ts_scale);
int64_t nanos = (ts - seconds * ts_scale);
// There is a bug in the ORC spec such that for negative timestamps, it is understood
// between the writer and reader that nanos will be adjusted to their positive component
// but the negative seconds will be left alone. This means that -2.6 is encoded as
Expand All @@ -786,7 +767,7 @@ __global__ void __launch_bounds__(block_size)
}
nanos = (nanos << 3) + zeroes;
}
s->lengths.u32[nz_idx] = nanos;
s->lengths.u64[nz_idx] = nanos;
break;
}
case STRING:
Expand Down Expand Up @@ -897,6 +878,9 @@ __global__ void __launch_bounds__(block_size)
uint32_t flush = (s->cur_row == s->chunk.num_rows) ? 1 : 0, n;
switch (s->chunk.type_kind) {
case TIMESTAMP:
n = IntegerRLE<CI_DATA2, uint64_t, false, 0x3ff, block_size>(
s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64);
break;
case STRING:
n = IntegerRLE<CI_DATA2, uint32_t, false, 0x3ff, block_size>(
s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32);
Expand Down
14 changes: 14 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,3 +724,17 @@ def test_orc_bool_encode_fail():
# Also validate data
pdf = pa.orc.ORCFile(buffer).read().to_pandas()
assert_eq(okay_df, pdf)


def test_nanoseconds_overflow():
buffer = BytesIO()
# Use nanosecond values that take more than 32 bits to encode
s = cudf.Series([710424008, -1338482640], dtype="datetime64[ns]")
expected = cudf.DataFrame({"s": s})
expected.to_orc(buffer)

cudf_got = cudf.read_orc(buffer)
assert_eq(expected, cudf_got)

pyarrow_got = pa.orc.ORCFile(buffer).read()
assert_eq(expected.to_pandas(), pyarrow_got.to_pandas())

0 comments on commit 6564581

Please sign in to comment.