Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix encode/decode of negative timestamps in ORC reader/writer #11586

Merged
merged 14 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -745,17 +745,21 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
uint32_t bw = 1 + (byte2 >> 5); // base value width, 1 to 8 bytes
uint32_t pw = kRLEv2_W[byte2 & 0x1f]; // patch width, 1 to 64 bits
if constexpr (sizeof(T) <= 4) {
uint32_t baseval, mask;
uint32_t baseval;
bytestream_readbe(bs, pos * 8, bw * 8, baseval);
mask = (1 << (bw * 8 - 1)) - 1;
rle->baseval.u32[r] = (baseval > mask) ? (-(int32_t)(baseval & mask)) : baseval;
uint32_t const mask = (1u << (bw * 8 - 1)) - 1;
// Negative values are represented with the highest bit set to 1
rle->baseval.u32[r] = (std::is_signed_v<T> and baseval > mask)
? -static_cast<int32_t>(baseval & mask)
: baseval;
} else {
uint64_t baseval, mask;
uint64_t baseval;
bytestream_readbe(bs, pos * 8, bw * 8, baseval);
mask = 1;
mask <<= (bw * 8) - 1;
mask -= 1;
rle->baseval.u64[r] = (baseval > mask) ? (-(int64_t)(baseval & mask)) : baseval;
uint64_t const mask = (1ul << (bw * 8 - 1)) - 1;
// Negative values are represented with the highest bit set to 1
rle->baseval.u64[r] = (std::is_signed_v<T> and baseval > mask)
? -static_cast<int64_t>(baseval & mask)
: baseval;
}
rle->m2_pw_byte3[r] = (pw << 8) | byte3;
pos += bw;
Expand Down Expand Up @@ -1758,12 +1762,15 @@ __global__ void __launch_bounds__(block_size)
}
case TIMESTAMP: {
int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch;
uint64_t nanos = secondary_val;
int64_t nanos = secondary_val;
nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7];
if (!tz_table.ttimes.empty()) {
seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds);
}
if (seconds < 0 && nanos != 0) { seconds -= 1; }
// Adjust seconds only for negative timestamps with positive nanoseconds.
// Alternative way to represent negative timestamps is with negative nanoseconds
// in which case the adjustment in not needed.
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
if (seconds < 0 && nanos > 0) { seconds -= 1; }

duration_ns d_ns{nanos};
duration_s d_s{seconds};
Expand Down
19 changes: 7 additions & 12 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,9 @@ static __device__ uint32_t IntegerRLE(
s->u.intrle.literal_w = bytecnt;
} else {
uint32_t range, w;
if (mode1_w > mode2_w && (literal_run - 1) * (mode1_w - mode2_w) > 4) {
// Mode 2 base value cannot be bigger than max int64_t, i.e. the first bit has to be 0
if (vmin <= std::numeric_limits<int64_t>::max() and mode1_w > mode2_w and
(literal_run - 1) * (mode1_w - mode2_w) > 4) {
s->u.intrle.literal_mode = 2;
w = mode2_w;
range = (uint32_t)vrange_mode2;
Expand Down Expand Up @@ -806,17 +808,10 @@ __global__ void __launch_bounds__(block_size)
case BOOLEAN:
case BYTE: s->vals.u8[nz_idx] = column.element<uint8_t>(row); break;
case TIMESTAMP: {
int64_t ts = column.element<int64_t>(row);
int32_t ts_scale = powers_of_ten[9 - min(s->chunk.scale, 9)];
int64_t seconds = ts / ts_scale;
int64_t nanos = (ts - seconds * ts_scale);
// There is a bug in the ORC spec such that for negative timestamps, it is understood
// between the writer and reader that nanos will be adjusted to their positive component
// but the negative seconds will be left alone. This means that -2.6 is encoded as
// seconds = -2 and nanos = 1+(-0.6) = 0.4
// This leads to an error in decoding time where -1 < time (s) < 0
// Details: https://github.com/rapidsai/cudf/pull/5529#issuecomment-648768925
if (nanos < 0) { nanos += ts_scale; }
int64_t ts = column.element<int64_t>(row);
int32_t ts_scale = powers_of_ten[9 - min(s->chunk.scale, 9)];
int64_t seconds = ts / ts_scale;
int64_t nanos = (ts - seconds * ts_scale);
s->vals.i64[nz_idx] = seconds - kORCTimeToUTC;
if (nanos != 0) {
// Trailing zeroes are encoded in the lower 3-bits
Expand Down
32 changes: 32 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1839,3 +1839,35 @@ def test_orc_writer_cols_as_map_type_error():
TypeError, match="cols_as_map_type must be a list of column names."
):
df.to_orc(buffer, cols_as_map_type=1)


@pytest.mark.parametrize("engine", ["cudf", "pyarrow"])
def test_negative_timestamp(tmpdir, engine):
expected = cudf.DataFrame(
{
"a": [
pd.Timestamp("1969-12-31 23:59:59.000123"),
pd.Timestamp("1969-12-31 23:59:58.000999"),
pd.Timestamp("1969-12-31 23:59:58.001001"),
pd.Timestamp("1839-12-24 03:58:56.000826"),
]
}
)
cudf_fname = tmpdir.join("cudf_neg_ts.orc")
expected.to_orc(cudf_fname)

cudf_got = cudf.read_orc(cudf_fname, engine=engine)
assert_eq(expected, cudf_got)
assert_eq(expected, pd.read_orc(cudf_fname))
assert pyarrow.orc.ORCFile(cudf_fname).read().equals(cudf_got.to_arrow())

pyorc_fname = tmpdir.join("pyorc_neg_ts.orc")
vuule marked this conversation as resolved.
Show resolved Hide resolved
pyorc_table = pa.Table.from_pandas(
expected.to_pandas(), preserve_index=False
)
pyarrow.orc.write_table(pyorc_table, pyorc_fname)

cudf_got = cudf.read_orc(pyorc_fname, engine=engine)
assert_eq(expected, cudf_got)
assert_eq(expected, pd.read_orc(pyorc_fname))
assert pyarrow.orc.ORCFile(pyorc_fname).read().equals(cudf_got.to_arrow())
vuule marked this conversation as resolved.
Show resolved Hide resolved