Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ORC issue with incorrect timestamp nanosecond values #7581

Merged
merged 5 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1455,8 +1455,9 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
// Decode data streams
{
uint32_t numvals = s->top.data.max_vals, secondary_val;
uint32_t vals_skipped = 0;
uint32_t numvals = s->top.data.max_vals;
uint64_t secondary_val = 0;
uint32_t vals_skipped = 0;
if (s->is_string || s->chunk.type_kind == TIMESTAMP) {
// For these data types, we have a secondary unsigned 32-bit data stream
orc_bytestream_s *bs = (is_dictionary(s->chunk.encoding_kind)) ? &s->bs : &s->bs2;
Expand All @@ -1471,9 +1472,15 @@ __global__ void __launch_bounds__(block_size)
}
if (numvals > ofs) {
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t);
if (s->chunk.type_kind == TIMESTAMP)
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u64[ofs], numvals - ofs, t);
else
numvals = ofs + Integer_RLEv1(bs, &s->u.rlev1, &s->vals.u32[ofs], numvals - ofs, t);
} else {
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t);
if (s->chunk.type_kind == TIMESTAMP)
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u64[ofs], numvals - ofs, t);
else
numvals = ofs + Integer_RLEv2(bs, &s->u.rlev2, &s->vals.u32[ofs], numvals - ofs, t);
}
__syncthreads();
if (numvals <= ofs && t >= ofs && t < s->top.data.max_vals) { s->vals.u32[t] = 0; }
Expand All @@ -1487,15 +1494,24 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
if (t == 0) { s->top.data.index.run_pos[cid] = 0; }
numvals -= vals_skipped;
if (t < numvals) { secondary_val = s->vals.u32[vals_skipped + t]; }
if (t < numvals) {
secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[vals_skipped + t]
: s->vals.u32[vals_skipped + t];
}
__syncthreads();
if (t < numvals) { s->vals.u32[t] = secondary_val; }
if (t < numvals) {
if (s->chunk.type_kind == TIMESTAMP)
s->vals.u64[t] = secondary_val;
else
s->vals.u32[t] = secondary_val;
}
}
}
__syncthreads();
// For strings with direct encoding, we need to convert the lengths into an offset
if (!is_dictionary(s->chunk.encoding_kind)) {
secondary_val = (t < numvals) ? s->vals.u32[t] : 0;
if (t < numvals)
secondary_val = (s->chunk.type_kind == TIMESTAMP) ? s->vals.u64[t] : s->vals.u32[t];
if (s->chunk.type_kind != TIMESTAMP) {
lengths_to_positions(s->vals.u32, numvals, t);
__syncthreads();
Expand Down Expand Up @@ -1693,7 +1709,7 @@ __global__ void __launch_bounds__(block_size)
}
case TIMESTAMP: {
int64_t seconds = s->vals.i64[t + vals_skipped] + s->top.data.utc_epoch;
uint32_t nanos = secondary_val;
uint64_t nanos = secondary_val;
nanos = (nanos >> 3) * kTimestampNanoScale[nanos & 7];
if (!tz_table.ttimes.empty()) {
seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds);
Expand All @@ -1716,7 +1732,7 @@ __global__ void __launch_bounds__(block_size)
if (s->chunk.type_kind == TIMESTAMP) {
int buffer_pos = s->top.data.max_vals;
if (t >= buffer_pos && t < buffer_pos + s->top.data.buffered_count) {
s->vals.u32[t - buffer_pos] = secondary_val;
s->vals.u64[t - buffer_pos] = secondary_val;
}
} else if (s->chunk.type_kind == BOOLEAN && t < s->top.data.buffered_count) {
s->vals.u8[t] = secondary_val;
Expand Down
36 changes: 10 additions & 26 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct orcenc_state_s {
union {
uint8_t u8[2048];
uint32_t u32[1024];
uint64_t u64[1024];
} lengths;
};

Expand All @@ -101,6 +102,7 @@ static inline __device__ uint32_t zigzag(int32_t v)
int32_t s = (v >> 31);
return ((v ^ s) * 2) - s;
}
static inline __device__ uint64_t zigzag(uint64_t v) { return v; }
static inline __device__ uint64_t zigzag(int64_t v)
{
int64_t s = (v < 0) ? 1 : 0;
Expand Down Expand Up @@ -286,24 +288,6 @@ static inline __device__ uint32_t StoreVarint(uint8_t *dst, uint64_t v)
return bytecnt;
}

static inline __device__ void intrle_minmax(int64_t &vmin, int64_t &vmax)
{
vmin = INT64_MIN;
vmax = INT64_MAX;
}
// static inline __device__ void intrle_minmax(uint64_t &vmin, uint64_t &vmax) { vmin = UINT64_C(0);
// vmax = UINT64_MAX; }
static inline __device__ void intrle_minmax(int32_t &vmin, int32_t &vmax)
{
vmin = INT32_MIN;
vmax = INT32_MAX;
}
static inline __device__ void intrle_minmax(uint32_t &vmin, uint32_t &vmax)
{
vmin = UINT32_C(0);
vmax = UINT32_MAX;
}

template <class T>
static inline __device__ void StoreBytesBigEndian(uint8_t *dst, T v, uint32_t w)
{
Expand Down Expand Up @@ -412,13 +396,9 @@ static __device__ uint32_t IntegerRLE(orcenc_state_s *s,
// Find minimum and maximum values
if (literal_run > 0) {
// Find min & max
T vmin, vmax;
T vmin = (t < literal_run) ? v0 : std::numeric_limits<T>::max();
T vmax = (t < literal_run) ? v0 : std::numeric_limits<T>::min();
uint32_t literal_mode, literal_w;
if (t < literal_run) {
vmin = vmax = v0;
} else {
intrle_minmax(vmax, vmin);
}
vmin = block_reduce(temp_storage).Reduce(vmin, cub::Min());
__syncthreads();
vmax = block_reduce(temp_storage).Reduce(vmax, cub::Max());
Expand Down Expand Up @@ -652,6 +632,7 @@ __global__ void __launch_bounds__(block_size)
typename cub::BlockReduce<int32_t, block_size>::TempStorage i32;
typename cub::BlockReduce<int64_t, block_size>::TempStorage i64;
typename cub::BlockReduce<uint32_t, block_size>::TempStorage u32;
typename cub::BlockReduce<uint64_t, block_size>::TempStorage u64;
} temp_storage;

orcenc_state_s *const s = &state_g;
Expand Down Expand Up @@ -763,7 +744,7 @@ __global__ void __launch_bounds__(block_size)
int64_t ts = static_cast<const int64_t *>(base)[row];
int32_t ts_scale = kTimeScale[min(s->chunk.scale, 9)];
int64_t seconds = ts / ts_scale;
int32_t nanos = (ts - seconds * ts_scale);
int64_t nanos = (ts - seconds * ts_scale);
// There is a bug in the ORC spec such that for negative timestamps, it is understood
// between the writer and reader that nanos will be adjusted to their positive component
// but the negative seconds will be left alone. This means that -2.6 is encoded as
Expand All @@ -786,7 +767,7 @@ __global__ void __launch_bounds__(block_size)
}
nanos = (nanos << 3) + zeroes;
}
s->lengths.u32[nz_idx] = nanos;
s->lengths.u64[nz_idx] = nanos;
break;
}
case STRING:
Expand Down Expand Up @@ -897,6 +878,9 @@ __global__ void __launch_bounds__(block_size)
uint32_t flush = (s->cur_row == s->chunk.num_rows) ? 1 : 0, n;
switch (s->chunk.type_kind) {
case TIMESTAMP:
n = IntegerRLE<CI_DATA2, uint64_t, false, 0x3ff, block_size>(
s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64);
break;
case STRING:
n = IntegerRLE<CI_DATA2, uint32_t, false, 0x3ff, block_size>(
s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32);
Expand Down
14 changes: 14 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,3 +724,17 @@ def test_orc_bool_encode_fail():
# Also validate data
pdf = pa.orc.ORCFile(buffer).read().to_pandas()
assert_eq(okay_df, pdf)


def test_nanoseconds_overflow():
buffer = BytesIO()
# Use nanosecond values that take more than 32 bits to encode
s = cudf.Series([710424008, -1338482640], dtype="datetime64[ns]")
expected = cudf.DataFrame({"s": s})
expected.to_orc(buffer)

cudf_got = cudf.read_orc(buffer)
assert_eq(expected, cudf_got)

pyarrow_got = pa.orc.ORCFile(buffer).read()
assert_eq(expected.to_pandas(), pyarrow_got.to_pandas())