From 65645815777adcbd99396b7694765207857fcc26 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 15 Mar 2021 11:09:39 -0700 Subject: [PATCH] Fix ORC issue with incorrect timestamp nanosecond values (#7581) Closes #7355 Use 64 bit variables/buffers to handle nanosecond values since nanosecond encode can overflow a 32bit value in some cases. Removed the overloaded `intrle_minmax` function, using templated `numeric_limits` functions instead (the alternative was to add another overload). Performance impact evaluation pending, but this fix seems unavoidable regardless of the impact. Authors: - Vukasin Milovanovic (@vuule) Approvers: - GALI PREM SAGAR (@galipremsagar) - Devavret Makkar (@devavret) - Kumar Aatish (@kaatish) URL: https://github.com/rapidsai/cudf/pull/7581 --- cpp/src/io/orc/stripe_data.cu | 34 ++++++++++++++++++++-------- cpp/src/io/orc/stripe_enc.cu | 36 +++++++++--------------------- python/cudf/cudf/tests/test_orc.py | 14 ++++++++++++ 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 4bca725a16b..1ff752034ad 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1455,8 +1455,9 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); // Decode data streams { - uint32_t numvals = s->top.data.max_vals, secondary_val; - uint32_t vals_skipped = 0; + uint32_t numvals = s->top.data.max_vals; + uint64_t secondary_val = 0; + uint32_t vals_skipped = 0; if (s->is_string || s->chunk.type_kind == TIMESTAMP) { // For these data types, we have a secondary unsigned 32-bit data stream orc_bytestream_s *bs = (is_dictionary(s->chunk.encoding_kind)) ? &s->bs : &s->bs2; @@ -1471,9 +1472,15 @@ __global__ void __launch_bounds__(block_size) } if (numvals > ofs) { if (is_rlev1(s->chunk.encoding_kind)) { - numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t); + if (s->chunk.type_kind == TIMESTAMP) + numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u64[ofs], numvals - ofs, t); + else + numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t); } else { - numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t); + if (s->chunk.type_kind == TIMESTAMP) + numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u64[ofs], numvals - ofs, t); + else + numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t); } __syncthreads(); if (numvals <= ofs && t >= ofs && t < s->top.data.max_vals) { s->vals.u32[t] = 0; } @@ -1487,15 +1494,24 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (t == 0) { s->top.data.index.run_pos[cid] = 0; } numvals -= vals_skipped; - if (t < numvals) { secondary_val = s->vals.u32[vals_skipped + t]; } + if (t < numvals) { + secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[vals_skipped + t] + : s->vals.u32[vals_skipped + t]; + } __syncthreads(); - if (t < numvals) { s->vals.u32[t] = secondary_val; } + if (t < numvals) { + if (s->chunk.type_kind == TIMESTAMP) + s->vals.u64[t] = secondary_val; + else + s->vals.u32[t] = secondary_val; + } } } __syncthreads(); // For strings with direct encoding, we need to convert the lengths into an offset if (!is_dictionary(s->chunk.encoding_kind)) { - secondary_val = (t < numvals) ? s->vals.u32[t] : 0; + if (t < numvals) + secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[t] : s->vals.u32[t]; if (s->chunk.type_kind != TIMESTAMP) { lengths_to_positions(s->vals.u32, numvals, t); __syncthreads(); @@ -1693,7 +1709,7 @@ __global__ void __launch_bounds__(block_size) } case TIMESTAMP: { int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch; - uint32_t nanos = secondary_val; + uint64_t nanos = secondary_val; nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7]; if (!tz_table.ttimes.empty()) { seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds); @@ -1716,7 +1732,7 @@ __global__ void __launch_bounds__(block_size) if (s->chunk.type_kind == TIMESTAMP) { int buffer_pos = s->top.data.max_vals; if (t >= buffer_pos && t < buffer_pos + s->top.data.buffered_count) { - s->vals.u32[t - buffer_pos] = secondary_val; + s->vals.u64[t - buffer_pos] = secondary_val; } } else if (s->chunk.type_kind == BOOLEAN && t < s->top.data.buffered_count) { s->vals.u8[t] = secondary_val; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 88cad005817..aef32efaf6e 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -92,6 +92,7 @@ struct orcenc_state_s { union { uint8_t u8[2048]; uint32_t u32[1024]; + uint64_t u64[1024]; } lengths; }; @@ -101,6 +102,7 @@ static inline __device__ uint32_t zigzag(int32_t v) int32_t s = (v >> 31); return ((v ^ s) * 2) - s; } +static inline __device__ uint64_t zigzag(uint64_t v) { return v; } static inline __device__ uint64_t zigzag(int64_t v) { int64_t s = (v < 0) ? 1 : 0; @@ -286,24 +288,6 @@ static inline __device__ uint32_t StoreVarint(uint8_t *dst, uint64_t v) return bytecnt; } -static inline __device__ void intrle_minmax(int64_t &vmin, int64_t &vmax) -{ - vmin = INT64_MIN; - vmax = INT64_MAX; -} -// static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) { vmin = UINT64_C(0); -// vmax = UINT64_MAX; } -static inline __device__ void intrle_minmax(int32_t &vmin, int32_t &vmax) -{ - vmin = INT32_MIN; - vmax = INT32_MAX; -} -static inline __device__ void intrle_minmax(uint32_t &vmin, uint32_t &vmax) -{ - vmin = UINT32_C(0); - vmax = UINT32_MAX; -} - template static inline __device__ void StoreBytesBigEndian(uint8_t *dst, T v, uint32_t w) { @@ -412,13 +396,9 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, // Find minimum and maximum values if (literal_run > 0) { // Find min & max - T vmin, vmax; + T vmin = (t < literal_run) ? v0 : std::numeric_limits::max(); + T vmax = (t < literal_run) ? v0 : std::numeric_limits::min(); uint32_t literal_mode, literal_w; - if (t < literal_run) { - vmin = vmax = v0; - } else { - intrle_minmax(vmax, vmin); - } vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min()); __syncthreads(); vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max()); @@ -652,6 +632,7 @@ __global__ void __launch_bounds__(block_size) typename cub::BlockReduce::TempStorage i32; typename cub::BlockReduce::TempStorage i64; typename cub::BlockReduce::TempStorage u32; + typename cub::BlockReduce::TempStorage u64; } temp_storage; orcenc_state_s *const s = &state_g; @@ -763,7 +744,7 @@ __global__ void __launch_bounds__(block_size) int64_t ts = static_cast(base)[row]; int32_t ts_scale = kTimeScale[min(s->chunk.scale, 9)]; int64_t seconds = ts / ts_scale; - int32_t nanos = (ts - seconds * ts_scale); + int64_t nanos = (ts - seconds * ts_scale); // There is a bug in the ORC spec such that for negative timestamps, it is understood // between the writer and reader that nanos will be adjusted to their positive component // but the negative seconds will be left alone. This means that -2.6 is encoded as @@ -786,7 +767,7 @@ __global__ void __launch_bounds__(block_size) } nanos = (nanos << 3) + zeroes; } - s->lengths.u32[nz_idx] = nanos; + s->lengths.u64[nz_idx] = nanos; break; } case STRING: @@ -897,6 +878,9 @@ __global__ void __launch_bounds__(block_size) uint32_t flush = (s->cur_row == s->chunk.num_rows) ? 1 : 0, n; switch (s->chunk.type_kind) { case TIMESTAMP: + n = IntegerRLE( + s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64); + break; case STRING: n = IntegerRLE( s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index ed91e909f25..ca8aa00f80c 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -724,3 +724,17 @@ def test_orc_bool_encode_fail(): # Also validate data pdf = pa.orc.ORCFile(buffer).read().to_pandas() assert_eq(okay_df, pdf) + + +def test_nanoseconds_overflow(): + buffer = BytesIO() + # Use nanosecond values that take more than 32 bits to encode + s = cudf.Series([710424008, -1338482640], dtype="datetime64[ns]") + expected = cudf.DataFrame({"s": s}) + expected.to_orc(buffer) + + cudf_got = cudf.read_orc(buffer) + assert_eq(expected, cudf_got) + + pyarrow_got = pa.orc.ORCFile(buffer).read() + assert_eq(expected.to_pandas(), pyarrow_got.to_pandas())