From ff671b4f4568e0f6015d49641970954b80a40a93 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 24 Aug 2022 01:15:54 -0700 Subject: [PATCH 1/7] fix'n'test --- cpp/src/io/orc/stripe_data.cu | 4 ++-- cpp/src/io/orc/stripe_enc.cu | 15 ++++----------- python/cudf/cudf/tests/test_orc.py | 25 +++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index a4cd5de8ec8..5db1459bb62 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1758,12 +1758,12 @@ __global__ void __launch_bounds__(block_size) } case TIMESTAMP: { int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch; - uint64_t nanos = secondary_val; + int64_t nanos = secondary_val; nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7]; if (!tz_table.ttimes.empty()) { seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds); } - if (seconds < 0 && nanos != 0) { seconds -= 1; } + if (seconds < 0 && nanos > 0) { seconds -= 1; } duration_ns d_ns{nanos}; duration_s d_s{seconds}; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 5e9a6f8df6b..16d1a071bca 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -806,17 +806,10 @@ __global__ void __launch_bounds__(block_size) case BOOLEAN: case BYTE: s->vals.u8[nz_idx] = column.element(row); break; case TIMESTAMP: { - int64_t ts = column.element(row); - int32_t ts_scale = powers_of_ten[9 - min(s->chunk.scale, 9)]; - int64_t seconds = ts / ts_scale; - int64_t nanos = (ts - seconds * ts_scale); - // There is a bug in the ORC spec such that for negative timestamps, it is understood - // between the writer and reader that nanos will be adjusted to their positive component - // but the negative seconds will be left alone. This means that -2.6 is encoded as - // seconds = -2 and nanos = 1+(-0.6) = 0.4 - // This leads to an error in decoding time where -1 < time (s) < 0 - // Details: https://github.com/rapidsai/cudf/pull/5529#issuecomment-648768925 - if (nanos < 0) { nanos += ts_scale; } + int64_t ts = column.element(row); + int32_t ts_scale = powers_of_ten[9 - min(s->chunk.scale, 9)]; + int64_t seconds = ts / ts_scale; + int64_t nanos = (ts - seconds * ts_scale); s->vals.i64[nz_idx] = seconds - kORCTimeToUTC; if (nanos != 0) { // Trailing zeroes are encoded in the lower 3-bits diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index db52e51bd33..45cd3398efd 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1839,3 +1839,28 @@ def test_orc_writer_cols_as_map_type_error(): TypeError, match="cols_as_map_type must be a list of column names." ): df.to_orc(buffer, cols_as_map_type=1) + + +def test_negative_timestamp(tmpdir): + ref = cudf.DataFrame( + { + "a": [ + pd.Timestamp("1969-12-31 23:59:58.000999"), + pd.Timestamp("1969-12-31 23:59:58.001001"), + pd.Timestamp("1839-12-24 03:58:56.000826"), + ] + } + ) + + cudf_fname = tmpdir.join("cudf_neg_ts.orc") + ref.to_orc(cudf_fname) + df = cudf.read_orc(cudf_fname) + pdf = pd.read_orc(cudf_fname) + assert_eq(df, pdf) + + pyorc_fname = tmpdir.join("pyorc_neg_ts.orc") + pyorc_table = pa.Table.from_pandas(ref.to_pandas(), preserve_index=False) + pyarrow.orc.write_table(pyorc_table, pyorc_fname) + df = cudf.read_orc(pyorc_fname) + pdf = pd.read_orc(pyorc_fname) + assert_eq(df, pdf) From 14deab92d326a53fec0505a08963898743ecbc1b Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 7 Sep 2022 23:04:12 -0700 Subject: [PATCH 2/7] fix decode of large uint64_t values(?) --- cpp/src/io/orc/stripe_data.cu | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 5db1459bb62..53ec2721c35 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -755,7 +755,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, mask = 1; mask <<= (bw * 8) - 1; mask -= 1; - rle->baseval.u64[r] = (baseval > mask) ? (-(int64_t)(baseval & mask)) : baseval; + if (std::is_signed::value and baseval > mask) { + rle->baseval.u64[r] = -(int64_t)(baseval & mask); + } else { + rle->baseval.u64[r] = baseval; + } } rle->m2_pw_byte3[r] = (pw << 8) | byte3; pos += bw; From 47e7c87d469b3caabdb5d6ff0d80233fe093adaf Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 8 Sep 2022 00:21:40 -0700 Subject: [PATCH 3/7] limit mode 2 to miv <= int64_max; expand test --- cpp/src/io/orc/stripe_enc.cu | 3 ++- python/cudf/cudf/tests/test_orc.py | 28 +++++++++++++++++----------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 16d1a071bca..69a96354315 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -442,7 +442,8 @@ static __device__ uint32_t IntegerRLE( s->u.intrle.literal_w = bytecnt; } else { uint32_t range, w; - if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) { + if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4 and + vmin <= std::numeric_limits::max()) { s->u.intrle.literal_mode = 2; w = mode2_w; range = (uint32_t)vrange_mode2; diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 45cd3398efd..8ba6c4df021 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1841,8 +1841,9 @@ def test_orc_writer_cols_as_map_type_error(): df.to_orc(buffer, cols_as_map_type=1) -def test_negative_timestamp(tmpdir): - ref = cudf.DataFrame( +@pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) +def test_negative_timestamp(tmpdir, engine): + expected = cudf.DataFrame( { "a": [ pd.Timestamp("1969-12-31 23:59:58.000999"), @@ -1851,16 +1852,21 @@ def test_negative_timestamp(tmpdir): ] } ) - cudf_fname = tmpdir.join("cudf_neg_ts.orc") - ref.to_orc(cudf_fname) - df = cudf.read_orc(cudf_fname) - pdf = pd.read_orc(cudf_fname) - assert_eq(df, pdf) + expected.to_orc(cudf_fname) + + cudf_got = cudf.read_orc(cudf_fname, engine=engine) + assert_eq(expected, cudf_got) + assert_eq(expected, pd.read_orc(cudf_fname)) + assert pyarrow.orc.ORCFile(cudf_fname).read().equals(cudf_got.to_arrow()) pyorc_fname = tmpdir.join("pyorc_neg_ts.orc") - pyorc_table = pa.Table.from_pandas(ref.to_pandas(), preserve_index=False) + pyorc_table = pa.Table.from_pandas( + expected.to_pandas(), preserve_index=False + ) pyarrow.orc.write_table(pyorc_table, pyorc_fname) - df = cudf.read_orc(pyorc_fname) - pdf = pd.read_orc(pyorc_fname) - assert_eq(df, pdf) + + cudf_got = cudf.read_orc(pyorc_fname, engine=engine) + assert_eq(expected, cudf_got) + assert_eq(expected, pd.read_orc(pyorc_fname)) + assert pyarrow.orc.ORCFile(pyorc_fname).read().equals(cudf_got.to_arrow()) From a814a1c12cc4d6746631b6d1b60b517d91fcfd4f Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 8 Sep 2022 13:34:46 -0700 Subject: [PATCH 4/7] clean up --- cpp/src/io/orc/stripe_data.cu | 27 +++++++++++++++------------ cpp/src/io/orc/stripe_enc.cu | 5 +++-- python/cudf/cudf/tests/test_orc.py | 1 + 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 53ec2721c35..21dbe599b96 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -745,21 +745,21 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, uint32_t bw = 1 + (byte2 >> 5); // base value width, 1 to 8 bytes uint32_t pw = kRLEv2_W[byte2 & 0x1f]; // patch width, 1 to 64 bits if constexpr (sizeof(T) <= 4) { - uint32_t baseval, mask; + uint32_t baseval; bytestream_readbe(bs, pos * 8, bw * 8, baseval); - mask = (1 << (bw * 8 - 1)) - 1; - rle->baseval.u32[r] = (baseval > mask) ? (-(int32_t)(baseval & mask)) : baseval; + uint32_t const mask = (1u << (bw * 8 - 1)) - 1; + // Negative values are represented with the highest bit set to 1 + rle->baseval.u32[r] = (std::is_signed::value and baseval > mask) + ? -static_cast(baseval & mask) + : baseval; } else { - uint64_t baseval, mask; + uint64_t baseval; bytestream_readbe(bs, pos * 8, bw * 8, baseval); - mask = 1; - mask <<= (bw * 8) - 1; - mask -= 1; - if (std::is_signed::value and baseval > mask) { - rle->baseval.u64[r] = -(int64_t)(baseval & mask); - } else { - rle->baseval.u64[r] = baseval; - } + uint64_t const mask = 1ul << ((bw * 8) - 1) - 1; + // Negative values are represented with the highest bit set to 1 + rle->baseval.u64[r] = (std::is_signed::value and baseval > mask) + ? -static_cast(baseval & mask) + : baseval; } rle->m2_pw_byte3[r] = (pw << 8) | byte3; pos += bw; @@ -1767,6 +1767,9 @@ __global__ void __launch_bounds__(block_size) if (!tz_table.ttimes.empty()) { seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds); } + // Adjust seconds only for negative timestamps with positive nanoseconds. + // Alternative way to represent negative timestamps is with negative nanoseconds + // in which case the adjustment in not needed. if (seconds < 0 && nanos > 0) { seconds -= 1; } duration_ns d_ns{nanos}; diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 69a96354315..206f755ce77 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -442,8 +442,9 @@ static __device__ uint32_t IntegerRLE( s->u.intrle.literal_w = bytecnt; } else { uint32_t range, w; - if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4 and - vmin <= std::numeric_limits::max()) { + // Mode 2 base value cannot be bigger than max int64_t, i.e. the first bit has to be 0 + if (vmin <= std::numeric_limits::max() and mode1_w > mode2_w and + (literal_run - 1) * (mode1_w - mode2_w) > 4) { s->u.intrle.literal_mode = 2; w = mode2_w; range = (uint32_t)vrange_mode2; diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 8ba6c4df021..4d3917a16cc 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1846,6 +1846,7 @@ def test_negative_timestamp(tmpdir, engine): expected = cudf.DataFrame( { "a": [ + pd.Timestamp("1969-12-31 23:59:59.000123"), pd.Timestamp("1969-12-31 23:59:58.000999"), pd.Timestamp("1969-12-31 23:59:58.001001"), pd.Timestamp("1839-12-24 03:58:56.000826"), From 8c49d53b4443af12f10ad80681ce5c95b049b1a1 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 8 Sep 2022 17:24:14 -0700 Subject: [PATCH 5/7] fix :| --- cpp/src/io/orc/stripe_data.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 21dbe599b96..cca4128fb35 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -755,7 +755,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, } else { uint64_t baseval; bytestream_readbe(bs, pos * 8, bw * 8, baseval); - uint64_t const mask = 1ul << ((bw * 8) - 1) - 1; + uint64_t const mask = (1ul << (bw * 8 - 1)) - 1; // Negative values are represented with the highest bit set to 1 rle->baseval.u64[r] = (std::is_signed::value and baseval > mask) ? -static_cast(baseval & mask) From f63381e2ba035f93033270bd3247bfb69fe74c89 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 9 Sep 2022 12:47:32 -0700 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Yunsong Wang --- cpp/src/io/orc/stripe_data.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index cca4128fb35..4fa407f4e88 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -749,7 +749,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, bytestream_readbe(bs, pos * 8, bw * 8, baseval); uint32_t const mask = (1u << (bw * 8 - 1)) - 1; // Negative values are represented with the highest bit set to 1 - rle->baseval.u32[r] = (std::is_signed::value and baseval > mask) + rle->baseval.u32[r] = (std::is_signed_v and baseval > mask) ? -static_cast(baseval & mask) : baseval; } else { @@ -757,7 +757,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, bytestream_readbe(bs, pos * 8, bw * 8, baseval); uint64_t const mask = (1ul << (bw * 8 - 1)) - 1; // Negative values are represented with the highest bit set to 1 - rle->baseval.u64[r] = (std::is_signed::value and baseval > mask) + rle->baseval.u64[r] = (std::is_signed_v and baseval > mask) ? -static_cast(baseval & mask) : baseval; } From e77ba7e254cefa72260da51ad1d97bcd4b1095d7 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 12 Sep 2022 12:58:06 -0700 Subject: [PATCH 7/7] test clean up --- python/cudf/cudf/tests/test_orc.py | 33 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 4d3917a16cc..a3db64faa5b 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1841,9 +1841,9 @@ def test_orc_writer_cols_as_map_type_error(): df.to_orc(buffer, cols_as_map_type=1) -@pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) -def test_negative_timestamp(tmpdir, engine): - expected = cudf.DataFrame( +@pytest.fixture +def negative_timestamp_df(): + return cudf.DataFrame( { "a": [ pd.Timestamp("1969-12-31 23:59:59.000123"), @@ -1853,21 +1853,22 @@ def test_negative_timestamp(tmpdir, engine): ] } ) - cudf_fname = tmpdir.join("cudf_neg_ts.orc") - expected.to_orc(cudf_fname) - cudf_got = cudf.read_orc(cudf_fname, engine=engine) - assert_eq(expected, cudf_got) - assert_eq(expected, pd.read_orc(cudf_fname)) - assert pyarrow.orc.ORCFile(cudf_fname).read().equals(cudf_got.to_arrow()) - pyorc_fname = tmpdir.join("pyorc_neg_ts.orc") +@pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) +def test_orc_reader_negative_timestamp(negative_timestamp_df, engine): + buffer = BytesIO() pyorc_table = pa.Table.from_pandas( - expected.to_pandas(), preserve_index=False + negative_timestamp_df.to_pandas(), preserve_index=False ) - pyarrow.orc.write_table(pyorc_table, pyorc_fname) + pyarrow.orc.write_table(pyorc_table, buffer) - cudf_got = cudf.read_orc(pyorc_fname, engine=engine) - assert_eq(expected, cudf_got) - assert_eq(expected, pd.read_orc(pyorc_fname)) - assert pyarrow.orc.ORCFile(pyorc_fname).read().equals(cudf_got.to_arrow()) + assert_eq(negative_timestamp_df, cudf.read_orc(buffer, engine=engine)) + + +def test_orc_writer_negative_timestamp(negative_timestamp_df): + buffer = BytesIO() + negative_timestamp_df.to_orc(buffer) + + assert_eq(negative_timestamp_df, pd.read_orc(buffer)) + assert_eq(negative_timestamp_df, pyarrow.orc.ORCFile(buffer).read())