Skip to content

Commit

Permalink
Fix timestamp truncation/overflow bugs in orc/parquet (#9382)
Browse files Browse the repository at this point in the history
Closes #9365

This PR gets rid of integer overflow issues along with the clock rate logic by directly operating on timestamp type id. It also fixes a truncation bug in Parquet. Corresponding unit tests are added.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #9382
  • Loading branch information
PointKernel authored Oct 7, 2021
1 parent aaea353 commit 8203d3d
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 46 deletions.
6 changes: 3 additions & 3 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ struct ColumnDesc {
uint32_t rowgroup_id; // row group position
ColumnEncodingKind encoding_kind; // column encoding kind
TypeKind type_kind; // column data type
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
type_id timestamp_type_id; // output timestamp type id (type_id::EMPTY by default)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
: cudf::size_of(column_types[col_idx]);
chunk.num_rowgroups = stripe_num_rowgroups;
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.ts_clock_rate = to_clockrate(_timestamp_type.id());
chunk.timestamp_type_id = _timestamp_type.id();
}
if (not is_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
Expand Down
34 changes: 23 additions & 11 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1789,17 +1789,29 @@ __global__ void __launch_bounds__(block_size)
seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds);
}
if (seconds < 0 && nanos != 0) { seconds -= 1; }
if (s->chunk.ts_clock_rate) {
duration_ns d_ns{nanos};
d_ns += duration_s{seconds};
static_cast<int64_t*>(data_out)[row] =
d_ns.count() * s->chunk.ts_clock_rate /
duration_ns::period::den; // Output to desired clock rate
} else {
cudf::duration_s d{seconds};
static_cast<int64_t*>(data_out)[row] =
cuda::std::chrono::duration_cast<cudf::duration_ns>(d).count() + nanos;
}

duration_ns d_ns{nanos};
duration_s d_s{seconds};

static_cast<int64_t*>(data_out)[row] = [&]() {
using cuda::std::chrono::duration_cast;
switch (s->chunk.timestamp_type_id) {
case type_id::TIMESTAMP_SECONDS:
return d_s.count() + duration_cast<duration_s>(d_ns).count();
case type_id::TIMESTAMP_MILLISECONDS:
return duration_cast<duration_ms>(d_s).count() +
duration_cast<duration_ms>(d_ns).count();
case type_id::TIMESTAMP_MICROSECONDS:
return duration_cast<duration_us>(d_s).count() +
duration_cast<duration_us>(d_ns).count();
case type_id::TIMESTAMP_NANOSECONDS:
default:
return duration_cast<duration_ns>(d_s).count() +
d_ns.count(); // nanoseconds as output in case of `type_id::EMPTY` and
// `type_id::TIMESTAMP_NANOSECONDS`
}
}();

break;
}
}
Expand Down
71 changes: 40 additions & 31 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,14 @@ inline __device__ void gpuStoreOutput(uint2* dst,
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
* @param[out] dst Pointer to row output data
*/
inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, int src_pos, int64_t* dst)
{
using cuda::std::chrono::duration_cast;

const uint8_t* src8;
uint32_t dict_pos, dict_size = s->dict_size, ofs;
int64_t ts;

if (s->dict_base) {
// Dictionary
Expand All @@ -646,36 +645,46 @@ inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, int src
ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos + 4 < dict_size) {
uint3 v;
int64_t nanos, days;
v.x = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 4);
v.z = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 8);
if (ofs) {
uint32_t next = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 12);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, v.z, ofs);
v.z = __funnelshift_r(v.z, next, ofs);
}
nanos = v.y;
nanos <<= 32;
nanos |= v.x;
// Convert from Julian day at noon to UTC seconds
days = static_cast<int32_t>(v.z);
cudf::duration_D d{
days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow
if (s->col.ts_clock_rate) {
int64_t secs = duration_cast<cudf::duration_s>(d).count() +
duration_cast<cudf::duration_s>(cudf::duration_ns{nanos}).count();
ts = secs * s->col.ts_clock_rate; // Output to desired clock rate
} else {
ts = duration_cast<cudf::duration_ns>(d).count() + nanos;
}
} else {
ts = 0;

if (dict_pos + 4 >= dict_size) {
*dst = 0;
return;
}
*dst = ts;

uint3 v;
int64_t nanos, days;
v.x = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 4);
v.z = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 8);
if (ofs) {
uint32_t next = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 12);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, v.z, ofs);
v.z = __funnelshift_r(v.z, next, ofs);
}
nanos = v.y;
nanos <<= 32;
nanos |= v.x;
// Convert from Julian day at noon to UTC seconds
days = static_cast<int32_t>(v.z);
cudf::duration_D d_d{
days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow

*dst = [&]() {
switch (s->col.ts_clock_rate) {
case 1: // seconds
return duration_cast<duration_s>(d_d).count() +
duration_cast<duration_s>(duration_ns{nanos}).count();
case 1'000: // milliseconds
return duration_cast<duration_ms>(d_d).count() +
duration_cast<duration_ms>(duration_ns{nanos}).count();
case 1'000'000: // microseconds
return duration_cast<duration_us>(d_d).count() +
duration_cast<duration_us>(duration_ns{nanos}).count();
case 1'000'000'000: // nanoseconds
default: return duration_cast<cudf::duration_ns>(d_d).count() + nanos;
}
}();
}

/**
Expand Down
25 changes: 25 additions & 0 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,31 @@ TYPED_TEST(OrcWriterTimestampTypeTest, TimestampsWithNulls)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TYPED_TEST(OrcWriterTimestampTypeTest, TimestampOverflow)
{
constexpr int64_t max = std::numeric_limits<int64_t>::max();
auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return max - i; });
auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

constexpr auto num_rows = 100;
column_wrapper<TypeParam, typename decltype(sequence)::value_type> col(
sequence, sequence + num_rows, validity);
table_view expected({col});

auto filepath = temp_env->get_temp_filepath("OrcTimestampOverflow.orc");
cudf_io::orc_writer_options out_opts =
cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected);
cudf_io::write_orc(out_opts);

cudf_io::orc_reader_options in_opts =
cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath})
.use_index(false)
.timestamp_type(this->type());
auto result = cudf_io::read_orc(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_F(OrcWriterTest, MultiColumn)
{
constexpr auto num_rows = 10;
Expand Down
34 changes: 34 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,22 @@ struct ParquetWriterChronoTypeTest : public ParquetWriterTest {
auto type() { return cudf::data_type{cudf::type_to_id<T>()}; }
};

// Typed test fixture for timestamp type tests
template <typename T>
struct ParquetWriterTimestampTypeTest : public ParquetWriterTest {
auto type() { return cudf::data_type{cudf::type_to_id<T>()}; }
};

// Declare typed test cases
// TODO: Replace with `NumericTypes` when unsigned support is added. Issue #5352
using SupportedTypes = cudf::test::Types<int8_t, int16_t, int32_t, int64_t, bool, float, double>;
TYPED_TEST_CASE(ParquetWriterNumericTypeTest, SupportedTypes);
using SupportedChronoTypes = cudf::test::Concat<cudf::test::ChronoTypes, cudf::test::DurationTypes>;
TYPED_TEST_CASE(ParquetWriterChronoTypeTest, SupportedChronoTypes);
// TODO: debug truncation errors for `timestamp_ns` and overflow errors for `timestamp_s` , see
// issue #9393.
using SupportedTimestampTypes = cudf::test::Types<cudf::timestamp_ms, cudf::timestamp_us>;
TYPED_TEST_CASE(ParquetWriterTimestampTypeTest, SupportedTimestampTypes);

// Base test fixture for chunked writer tests
struct ParquetChunkedWriterTest : public cudf::test::BaseFixture {
Expand Down Expand Up @@ -363,6 +373,30 @@ TYPED_TEST(ParquetWriterChronoTypeTest, ChronosWithNulls)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view());
}

TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow)
{
constexpr int64_t max = std::numeric_limits<int64_t>::max();
auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return max - i; });
auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

constexpr auto num_rows = 100;
column_wrapper<TypeParam, typename decltype(sequence)::value_type> col(
sequence, sequence + num_rows, validity);
table_view expected({col});

auto filepath = temp_env->get_temp_filepath("OrcTimestampOverflow.orc");
cudf_io::parquet_writer_options out_opts =
cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected);
cudf_io::write_parquet(out_opts);

cudf_io::parquet_reader_options in_opts =
cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath})
.timestamp_type(this->type());
auto result = cudf_io::read_parquet(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_F(ParquetWriterTest, MultiColumn)
{
constexpr auto num_rows = 100;
Expand Down

0 comments on commit 8203d3d

Please sign in to comment.