Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp truncation/overflow bugs in orc/parquet #9382

Merged
merged 10 commits into from
Oct 7, 2021
6 changes: 3 additions & 3 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ struct ColumnDesc {
uint32_t rowgroup_id; // row group position
ColumnEncodingKind encoding_kind; // column encoding kind
TypeKind type_kind; // column data type
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
uint8_t dtype_len; // data type length (for types that can be mapped to different sizes)
int32_t decimal_scale; // number of fractional decimal digits for decimal type
type_id timestamp_type_id; // output timestamp type id (type_id::EMPTY by default)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
: cudf::size_of(column_types[col_idx]);
chunk.num_rowgroups = stripe_num_rowgroups;
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.ts_clock_rate = to_clockrate(_timestamp_type.id());
chunk.timestamp_type_id = _timestamp_type.id();
}
if (not is_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
Expand Down
34 changes: 23 additions & 11 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1789,17 +1789,29 @@ __global__ void __launch_bounds__(block_size)
seconds += get_gmt_offset(tz_table.ttimes, tz_table.offsets, seconds);
}
if (seconds < 0 && nanos != 0) { seconds -= 1; }
if (s->chunk.ts_clock_rate) {
duration_ns d_ns{nanos};
d_ns += duration_s{seconds};
static_cast<int64_t*>(data_out)[row] =
d_ns.count() * s->chunk.ts_clock_rate /
duration_ns::period::den; // Output to desired clock rate
} else {
cudf::duration_s d{seconds};
static_cast<int64_t*>(data_out)[row] =
cuda::std::chrono::duration_cast<cudf::duration_ns>(d).count() + nanos;
}

duration_ns d_ns{nanos};
duration_s d_s{seconds};

static_cast<int64_t*>(data_out)[row] = [&]() {
using cuda::std::chrono::duration_cast;
switch (s->chunk.timestamp_type_id) {
case type_id::TIMESTAMP_SECONDS:
return d_s.count() + duration_cast<duration_s>(d_ns).count();
case type_id::TIMESTAMP_MILLISECONDS:
return duration_cast<duration_ms>(d_s).count() +
duration_cast<duration_ms>(d_ns).count();
case type_id::TIMESTAMP_MICROSECONDS:
return duration_cast<duration_us>(d_s).count() +
duration_cast<duration_us>(d_ns).count();
case type_id::TIMESTAMP_NANOSECONDS:
default:
return duration_cast<duration_ns>(d_s).count() +
d_ns.count(); // nanoseconds as output in case of `type_id::EMPTY` and
// `type_id::TIMESTAMP_NANOSECONDS`
}
}();

break;
}
}
Expand Down
71 changes: 40 additions & 31 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,14 @@ inline __device__ void gpuStoreOutput(uint2* dst,
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
* @param[out] dst Pointer to row output data
*/
inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, int src_pos, int64_t* dst)
{
using cuda::std::chrono::duration_cast;

const uint8_t* src8;
uint32_t dict_pos, dict_size = s->dict_size, ofs;
int64_t ts;

if (s->dict_base) {
// Dictionary
Expand All @@ -646,36 +645,46 @@ inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, int src
ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos + 4 < dict_size) {
uint3 v;
int64_t nanos, days;
v.x = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 4);
v.z = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 8);
if (ofs) {
uint32_t next = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 12);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, v.z, ofs);
v.z = __funnelshift_r(v.z, next, ofs);
}
nanos = v.y;
nanos <<= 32;
nanos |= v.x;
// Convert from Julian day at noon to UTC seconds
days = static_cast<int32_t>(v.z);
cudf::duration_D d{
days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow
if (s->col.ts_clock_rate) {
int64_t secs = duration_cast<cudf::duration_s>(d).count() +
duration_cast<cudf::duration_s>(cudf::duration_ns{nanos}).count();
ts = secs * s->col.ts_clock_rate; // Output to desired clock rate
} else {
ts = duration_cast<cudf::duration_ns>(d).count() + nanos;
}
} else {
ts = 0;

if (dict_pos + 4 >= dict_size) {
*dst = 0;
return;
}
*dst = ts;

uint3 v;
int64_t nanos, days;
v.x = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 4);
v.z = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 8);
if (ofs) {
uint32_t next = *reinterpret_cast<const uint32_t*>(src8 + dict_pos + 12);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, v.z, ofs);
v.z = __funnelshift_r(v.z, next, ofs);
}
nanos = v.y;
nanos <<= 32;
nanos |= v.x;
// Convert from Julian day at noon to UTC seconds
days = static_cast<int32_t>(v.z);
cudf::duration_D d_d{
days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow

*dst = [&]() {
switch (s->col.ts_clock_rate) {
case 1: // seconds
return duration_cast<duration_s>(d_d).count() +
duration_cast<duration_s>(duration_ns{nanos}).count();
case 1'000: // milliseconds
return duration_cast<duration_ms>(d_d).count() +
duration_cast<duration_ms>(duration_ns{nanos}).count();
case 1'000'000: // microseconds
return duration_cast<duration_us>(d_d).count() +
duration_cast<duration_us>(duration_ns{nanos}).count();
case 1'000'000'000: // nanoseconds
default: return duration_cast<cudf::duration_ns>(d_d).count() + nanos;
}
}();
}

/**
Expand Down
25 changes: 25 additions & 0 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,31 @@ TYPED_TEST(OrcWriterTimestampTypeTest, TimestampsWithNulls)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TYPED_TEST(OrcWriterTimestampTypeTest, TimestampOverflow)
{
constexpr int64_t max = std::numeric_limits<int64_t>::max();
auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return max - i; });
auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

constexpr auto num_rows = 100;
column_wrapper<TypeParam, typename decltype(sequence)::value_type> col(
sequence, sequence + num_rows, validity);
table_view expected({col});

auto filepath = temp_env->get_temp_filepath("OrcTimestampOverflow.orc");
cudf_io::orc_writer_options out_opts =
cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected);
cudf_io::write_orc(out_opts);

cudf_io::orc_reader_options in_opts =
cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath})
.use_index(false)
.timestamp_type(this->type());
auto result = cudf_io::read_orc(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_F(OrcWriterTest, MultiColumn)
{
constexpr auto num_rows = 10;
Expand Down
34 changes: 34 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,22 @@ struct ParquetWriterChronoTypeTest : public ParquetWriterTest {
auto type() { return cudf::data_type{cudf::type_to_id<T>()}; }
};

// Typed test fixture for timestamp type tests
template <typename T>
struct ParquetWriterTimestampTypeTest : public ParquetWriterTest {
auto type() { return cudf::data_type{cudf::type_to_id<T>()}; }
};

// Declare typed test cases
// TODO: Replace with `NumericTypes` when unsigned support is added. Issue #5352
using SupportedTypes = cudf::test::Types<int8_t, int16_t, int32_t, int64_t, bool, float, double>;
TYPED_TEST_CASE(ParquetWriterNumericTypeTest, SupportedTypes);
using SupportedChronoTypes = cudf::test::Concat<cudf::test::ChronoTypes, cudf::test::DurationTypes>;
TYPED_TEST_CASE(ParquetWriterChronoTypeTest, SupportedChronoTypes);
// TODO: debug truncation errors for `timestamp_ns` and overflow errors for `timestamp_s` , see
// issue #9393.
using SupportedTimestampTypes = cudf::test::Types<cudf::timestamp_ms, cudf::timestamp_us>;
TYPED_TEST_CASE(ParquetWriterTimestampTypeTest, SupportedTimestampTypes);

// Base test fixture for chunked writer tests
struct ParquetChunkedWriterTest : public cudf::test::BaseFixture {
Expand Down Expand Up @@ -363,6 +373,30 @@ TYPED_TEST(ParquetWriterChronoTypeTest, ChronosWithNulls)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view());
}

TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow)
{
constexpr int64_t max = std::numeric_limits<int64_t>::max();
auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return max - i; });
auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

constexpr auto num_rows = 100;
column_wrapper<TypeParam, typename decltype(sequence)::value_type> col(
sequence, sequence + num_rows, validity);
table_view expected({col});

auto filepath = temp_env->get_temp_filepath("OrcTimestampOverflow.orc");
cudf_io::parquet_writer_options out_opts =
cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected);
cudf_io::write_parquet(out_opts);

cudf_io::parquet_reader_options in_opts =
cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath})
.timestamp_type(this->type());
auto result = cudf_io::read_parquet(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_F(ParquetWriterTest, MultiColumn)
{
constexpr auto num_rows = 100;
Expand Down