From 9de36fdebc2f59b95fef803bbafe817f2ce00728 Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Mar 2021 11:23:06 -0800 Subject: [PATCH 1/5] use 64bit values for encoded nanoseconds --- cpp/src/io/orc/stripe_data.cu | 34 +++++++++++++++++++++++++--------- cpp/src/io/orc/stripe_enc.cu | 17 +++++++++++++---- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 4bca725a16b..1ff752034ad 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1455,8 +1455,9 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); // Decode data streams { - uint32_t numvals = s->top.data.max_vals, secondary_val; - uint32_t vals_skipped = 0; + uint32_t numvals = s->top.data.max_vals; + uint64_t secondary_val = 0; + uint32_t vals_skipped = 0; if (s->is_string || s->chunk.type_kind == TIMESTAMP) { // For these data types, we have a secondary unsigned 32-bit data stream orc_bytestream_s *bs = (is_dictionary(s->chunk.encoding_kind)) ? &s->bs : &s->bs2; @@ -1471,9 +1472,15 @@ __global__ void __launch_bounds__(block_size) } if (numvals > ofs) { if (is_rlev1(s->chunk.encoding_kind)) { - numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t); + if (s->chunk.type_kind == TIMESTAMP) + numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u64[ofs], numvals - ofs, t); + else + numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t); } else { - numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t); + if (s->chunk.type_kind == TIMESTAMP) + numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u64[ofs], numvals - ofs, t); + else + numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t); } __syncthreads(); if (numvals <= ofs && t >= ofs && t < s->top.data.max_vals) { s->vals.u32[t] = 0; } @@ -1487,15 +1494,24 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (t == 0) { s->top.data.index.run_pos[cid] = 0; } numvals -= vals_skipped; - if (t < numvals) { secondary_val = s->vals.u32[vals_skipped + t]; } + if (t < numvals) { + secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[vals_skipped + t] + : s->vals.u32[vals_skipped + t]; + } __syncthreads(); - if (t < numvals) { s->vals.u32[t] = secondary_val; } + if (t < numvals) { + if (s->chunk.type_kind == TIMESTAMP) + s->vals.u64[t] = secondary_val; + else + s->vals.u32[t] = secondary_val; + } } } __syncthreads(); // For strings with direct encoding, we need to convert the lengths into an offset if (!is_dictionary(s->chunk.encoding_kind)) { - secondary_val = (t < numvals) ? s->vals.u32[t] : 0; + if (t < numvals) + secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[t] : s->vals.u32[t]; if (s->chunk.type_kind != TIMESTAMP) { lengths_to_positions(s->vals.u32, numvals, t); __syncthreads(); @@ -1693,7 +1709,7 @@ __global__ void __launch_bounds__(block_size) } case TIMESTAMP: { int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch; - uint32_t nanos = secondary_val; + uint64_t nanos = secondary_val; nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7]; if (!tz_table.ttimes.empty()) { seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds); @@ -1716,7 +1732,7 @@ __global__ void __launch_bounds__(block_size) if (s->chunk.type_kind == TIMESTAMP) { int buffer_pos = s->top.data.max_vals; if (t >= buffer_pos && t < buffer_pos + s->top.data.buffered_count) { - s->vals.u32[t - buffer_pos] = secondary_val; + s->vals.u64[t - buffer_pos] = secondary_val; } } else if (s->chunk.type_kind == BOOLEAN && t < s->top.data.buffered_count) { s->vals.u8[t] = secondary_val; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 88cad005817..14a74e5595e 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -92,6 +92,7 @@ struct orcenc_state_s { union { uint8_t u8[2048]; uint32_t u32[1024]; + uint64_t u64[1024]; } lengths; }; @@ -101,6 +102,7 @@ static inline __device__ uint32_t zigzag(int32_t v) int32_t s = (v >> 31); return ((v ^ s) * 2) - s; } +static inline __device__ uint64_t zigzag(uint64_t v) { return v; } static inline __device__ uint64_t zigzag(int64_t v) { int64_t s = (v < 0) ? 1 : 0; @@ -291,8 +293,11 @@ static inline __device__ void intrle_minmax(int64_t &vmin, int64_t &vmax) vmin = INT64_MIN; vmax = INT64_MAX; } -// static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) { vmin = UINT64_C(0); -// vmax = UINT64_MAX; } +static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) +{ + vmin = UINT64_C(0); + vmax = UINT64_MAX; +} static inline __device__ void intrle_minmax(int32_t &vmin, int32_t &vmax) { vmin = INT32_MIN; @@ -652,6 +657,7 @@ __global__ void __launch_bounds__(block_size) typename cub::BlockReduce::TempStorage i32; typename cub::BlockReduce::TempStorage i64; typename cub::BlockReduce::TempStorage u32; + typename cub::BlockReduce::TempStorage u64; } temp_storage; orcenc_state_s *const s = &state_g; @@ -763,7 +769,7 @@ __global__ void __launch_bounds__(block_size) int64_t ts = static_cast(base)[row]; int32_t ts_scale = kTimeScale[min(s->chunk.scale, 9)]; int64_t seconds = ts / ts_scale; - int32_t nanos = (ts - seconds * ts_scale); + int64_t nanos = (ts - seconds * ts_scale); // There is a bug in the ORC spec such that for negative timestamps, it is understood // between the writer and reader that nanos will be adjusted to their positive component // but the negative seconds will be left alone. This means that -2.6 is encoded as @@ -786,7 +792,7 @@ __global__ void __launch_bounds__(block_size) } nanos = (nanos << 3) + zeroes; } - s->lengths.u32[nz_idx] = nanos; + s->lengths.u64[nz_idx] = nanos; break; } case STRING: @@ -897,6 +903,9 @@ __global__ void __launch_bounds__(block_size) uint32_t flush = (s->cur_row == s->chunk.num_rows) ? 1 : 0, n; switch (s->chunk.type_kind) { case TIMESTAMP: + n = IntegerRLE( + s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64); + break; case STRING: n = IntegerRLE( s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); From b47527a478d634ff2c1789dfa8fb43f0065d3910 Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Mar 2021 11:23:12 -0800 Subject: [PATCH 2/5] add test --- python/cudf/cudf/tests/test_orc.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index ed91e909f25..25fe4b9de69 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -724,3 +724,21 @@ def test_orc_bool_encode_fail(): # Also validate data pdf = pa.orc.ORCFile(buffer).read().to_pandas() assert_eq(okay_df, pdf) + + +def test_nanoseconds_overflow(tmpdir): + path = tmpdir.join("nano64bit.orc") + # Use nanosecond values that take more than 32 bits to encode + s = cudf.Series([710424008, -338482640], dtype="datetime64[ns]") + expected = cudf.DataFrame({"s": s}) + expected.to_orc(path) + + cudf_got = cudf.read_orc(path) + assert_eq(expected, cudf_got) + + try: + orcfile = pa.orc.ORCFile(path) + except pa.ArrowIOError as e: + pytest.skip(".orc file is not found: %s" % e) + pyarrow_got = orcfile.read() + assert_eq(expected.to_pandas(), pyarrow_got.to_pandas()) From 536aed26ca7090c24d1bc6ff6f122cf850102b66 Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Mar 2021 12:39:56 -0800 Subject: [PATCH 3/5] modify test to avoid the specs bug --- python/cudf/cudf/tests/test_orc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 25fe4b9de69..f0fe808265e 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -729,7 +729,7 @@ def test_orc_bool_encode_fail(): def test_nanoseconds_overflow(tmpdir): path = tmpdir.join("nano64bit.orc") # Use nanosecond values that take more than 32 bits to encode - s = cudf.Series([710424008, -338482640], dtype="datetime64[ns]") + s = cudf.Series([710424008, -1338482640], dtype="datetime64[ns]") expected = cudf.DataFrame({"s": s}) expected.to_orc(path) From 43fb9d2fbf0bf826cc3ea1a5acfdba6049f448cd Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Mar 2021 12:45:33 -0800 Subject: [PATCH 4/5] remove intrle_minmax functions --- cpp/src/io/orc/stripe_enc.cu | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 14a74e5595e..aef32efaf6e 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -288,27 +288,6 @@ static inline __device__ uint32_t StoreVarint(uint8_t *dst, uint64_t v) return bytecnt; } -static inline __device__ void intrle_minmax(int64_t &vmin, int64_t &vmax) -{ - vmin = INT64_MIN; - vmax = INT64_MAX; -} -static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) -{ - vmin = UINT64_C(0); - vmax = UINT64_MAX; -} -static inline __device__ void intrle_minmax(int32_t &vmin, int32_t &vmax) -{ - vmin = INT32_MIN; - vmax = INT32_MAX; -} -static inline __device__ void intrle_minmax(uint32_t &vmin, uint32_t &vmax) -{ - vmin = UINT32_C(0); - vmax = UINT32_MAX; -} - template static inline __device__ void StoreBytesBigEndian(uint8_t *dst, T v, uint32_t w) { @@ -417,13 +396,9 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s, // Find minimum and maximum values if (literal_run > 0) { // Find min & max - T vmin, vmax; + T vmin = (t < literal_run) ? v0 : std::numeric_limits::max(); + T vmax = (t < literal_run) ? v0 : std::numeric_limits::min(); uint32_t literal_mode, literal_w; - if (t < literal_run) { - vmin = vmax = v0; - } else { - intrle_minmax(vmax, vmin); - } vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min()); __syncthreads(); vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max()); From 1e1b78514452fbf5c82499d73135642b6e0fd35a Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Mar 2021 13:19:53 -0800 Subject: [PATCH 5/5] use buffer instead of file for the test IO --- python/cudf/cudf/tests/test_orc.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index f0fe808265e..ca8aa00f80c 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -726,19 +726,15 @@ def test_orc_bool_encode_fail(): assert_eq(okay_df, pdf) -def test_nanoseconds_overflow(tmpdir): - path = tmpdir.join("nano64bit.orc") +def test_nanoseconds_overflow(): + buffer = BytesIO() # Use nanosecond values that take more than 32 bits to encode s = cudf.Series([710424008, -1338482640], dtype="datetime64[ns]") expected = cudf.DataFrame({"s": s}) - expected.to_orc(path) + expected.to_orc(buffer) - cudf_got = cudf.read_orc(path) + cudf_got = cudf.read_orc(buffer) assert_eq(expected, cudf_got) - try: - orcfile = pa.orc.ORCFile(path) - except pa.ArrowIOError as e: - pytest.skip(".orc file is not found: %s" % e) - pyarrow_got = orcfile.read() + pyarrow_got = pa.orc.ORCFile(buffer).read() assert_eq(expected.to_pandas(), pyarrow_got.to_pandas())