Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding timeAdd with timezone kernel #1700

Closed
wants to merge 18 commits into from
Closed
45 changes: 45 additions & 0 deletions src/main/cpp/src/GpuTimeZoneDBJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,49 @@ Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_convertUTCTimestampColumnToTimeZo
}
CATCH_STD(env, 0);
}

res-life marked this conversation as resolved.
Show resolved Hide resolved
JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCS(JNIEnv* env,
jclass,
jlong input_handle,
jlong duration_handle,
jlong transitions_handle,
jint tz_index)
{
JNI_NULL_CHECK(env, input_handle, "column is null", 0);
JNI_NULL_CHECK(env, transitions_handle, "column is null", 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a null check on the duration handle too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

try {
cudf::jni::auto_set_device(env);
auto const input = reinterpret_cast<cudf::column_view const*>(input_handle);
auto const duration = reinterpret_cast<cudf::numeric_scalar<int64_t> const*>(duration_handle);
auto const transitions = reinterpret_cast<cudf::table_view const*>(transitions_handle);
auto const index = static_cast<cudf::size_type>(tz_index);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::time_add(*input, *duration, *transitions, index).release());
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCC(JNIEnv* env,
jclass,
jlong input_handle,
jlong duration_handle,
jlong transitions_handle,
jint tz_index)
{
JNI_NULL_CHECK(env, input_handle, "column is null", 0);
JNI_NULL_CHECK(env, duration_handle, "column is null", 0);
JNI_NULL_CHECK(env, transitions_handle, "column is null", 0);
try {
cudf::jni::auto_set_device(env);
auto const input = reinterpret_cast<cudf::column_view const*>(input_handle);
auto const duration = reinterpret_cast<cudf::column_view const*>(duration_handle);
auto const transitions = reinterpret_cast<cudf::table_view const*>(transitions_handle);
auto const index = static_cast<cudf::size_type>(tz_index);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::time_add(*input, *duration, *transitions, index).release());
}
CATCH_STD(env, 0);
}
}
225 changes: 225 additions & 0 deletions src/main/cpp/src/timezones.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>

Expand All @@ -34,6 +35,7 @@ using column = cudf::column;
using column_device_view = cudf::column_device_view;
using column_view = cudf::column_view;
using lists_column_device_view = cudf::detail::lists_column_device_view;
using scalar_i64 = cudf::numeric_scalar<int64_t>;
using size_type = cudf::size_type;
using struct_view = cudf::struct_view;
using table_view = cudf::table_view;
Expand Down Expand Up @@ -118,6 +120,183 @@ auto convert_timestamp_tz(column_view const& input,
return results;
}

template <typename timestamp_type>
struct time_add_functor {
using duration_type = typename timestamp_type::duration;

lists_column_device_view const transitions;

size_type const tz_index;

int64_t const duration_scalar;

__device__ inline timestamp_type plus_with_tz(timestamp_type const& timestamp,
int64_t const& duration) const
{
if (duration == 0L) { return timestamp; }

auto const utc_instants = transitions.child().child(0);
auto const tz_instants = transitions.child().child(1);
auto const utc_offsets = transitions.child().child(2);

auto const epoch_seconds_utc = static_cast<int64_t>(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
cuda::std::chrono::duration_cast<cudf::duration_s>(timestamp.time_since_epoch()).count());
// input epoch seconds

auto const tz_transitions = cudf::list_device_view{transitions, tz_index};
auto const list_size = tz_transitions.size();

auto const transition_times_utc = cudf::device_span<int64_t const>(
utc_instants.data<int64_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

auto const transition_times_tz = cudf::device_span<int64_t const>(
tz_instants.data<int64_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

// step 1: Binary search on utc to find the correct offset to convert utc to local
auto const utc_it = thrust::upper_bound(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we do this some place else? Shouldn't transitioning between time zones like this be a static inline function? And if not can you add comments as to why this is different from GpuTimeZoneDB.fromUtcTimestampToTimestamp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this and convert_timestamp_tz_functor is that we want to keep the to_local_offset. I updated the comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is a enough code to make it common. If we take the time to make the proper abstractions on the CUDA side then adding DST support should have code changes restricted primarily to one common place. But I will leave that up to you and the rest of the people working on timezone support.

thrust::seq, transition_times_utc.begin(), transition_times_utc.end(), epoch_seconds_utc);
auto const utc_idx =
static_cast<size_type>(thrust::distance(transition_times_utc.begin(), utc_it));
auto const utc_list_offset = tz_transitions.element_offset(utc_idx - 1);
auto const to_local_offset =
static_cast<int64_t>(utc_offsets.element<int32_t>(utc_list_offset));

auto const to_local_offset_duration = cuda::std::chrono::duration_cast<duration_type>(
cudf::duration_s{static_cast<int64_t>(to_local_offset)});

auto const duration_typed = cuda::std::chrono::duration_cast<duration_type>(
cudf::duration_us{static_cast<int64_t>(duration)});
revans2 marked this conversation as resolved.
Show resolved Hide resolved

// step 2: add the duration to the local timestamp
auto const local_timestamp_res = timestamp + to_local_offset_duration + duration_typed;

auto const result_epoch_seconds = static_cast<int64_t>(
cuda::std::chrono::duration_cast<cudf::duration_s>(local_timestamp_res.time_since_epoch())
.count());

// step 3: Binary search on local to find the correct offset to convert local to utc
auto const local_it = thrust::upper_bound(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
thrust::seq, transition_times_tz.begin(), transition_times_tz.end(), result_epoch_seconds);
auto local_idx =
static_cast<size_type>(thrust::distance(transition_times_tz.begin(), local_it));

// In GpuTimeZoneDB, we build the transition list with
// (utcInstant, utcInstant + OffsetBefore, OffsetAfter) if it is a overlap.
// But in the step 3, we actually need to binary search on the utcInstant + OffsetBefore
// to find the correct offset to convert local to utc. To reuse the code, we need to
// add a special post processing here, to make sure we get the correct id that
// utcInstant + OffsetBefore is larger than the result_epoch_seconds.
auto const temp_list_offset = tz_transitions.element_offset(local_idx);
auto const temp_offset = static_cast<int64_t>(utc_offsets.element<int32_t>(temp_list_offset));

// We don't want to check this if the idx is the first or last
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if (local_idx != 0 && transition_times_utc[local_idx] != INT64_MAX &&
revans2 marked this conversation as resolved.
Show resolved Hide resolved
transition_times_utc[local_idx] + temp_offset <= result_epoch_seconds) {
local_idx += 1;
}
auto const local_list_offset = tz_transitions.element_offset(local_idx - 1);
auto to_utc_offset = static_cast<int64_t>(utc_offsets.element<int32_t>(local_list_offset));
auto const upper_bound_epoch = transition_times_tz[local_idx - 1];
auto const upper_bound_utc = transition_times_utc[local_idx - 1];

// step 4: if the result is in the overlap, try to select the original offset if possible
auto const early_offset = static_cast<int64_t>(upper_bound_epoch - upper_bound_utc);
bool const is_gap = (upper_bound_utc + to_utc_offset == upper_bound_epoch);
if (!is_gap && upper_bound_utc != INT64_MIN && upper_bound_utc != INT64_MAX) { // overlap
// The overlap range is [utcInstant + offsetBefore, utcInstant + offsetAfter]
auto const overlap_before = static_cast<int64_t>(upper_bound_utc + to_utc_offset);
if (result_epoch_seconds >= overlap_before && result_epoch_seconds <= upper_bound_epoch) {
// By default, to_utc_offset is the offsetAfter, so if the to_local_offset is valid and
// need to replace the to_utc_offset, it only happens when it is early_offset
if (to_local_offset == early_offset) { to_utc_offset = early_offset; }
}
}

auto const to_utc_offset_duration = cuda::std::chrono::duration_cast<duration_type>(
cudf::duration_s{static_cast<int64_t>(to_utc_offset)});

// step 5: subtract the offset to convert local to utc
return local_timestamp_res - to_utc_offset_duration;
}

__device__ timestamp_type operator()(timestamp_type const& timestamp) const
{
return plus_with_tz(timestamp, duration_scalar);
}

__device__ timestamp_type operator()(timestamp_type const& timestamp,
int64_t const& interval) const
{
return plus_with_tz(timestamp, interval);
}
};

template <typename timestamp_type>
auto time_add_with_tz(column_view const& input,
scalar_i64 const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// get the fixed transitions
auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream);
auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr};

if (!duration.is_valid()) {
// return a column of nulls
auto results = cudf::make_timestamp_column(
input.type(), input.size(), cudf::mask_state::ALL_NULL, stream, mr);
return results;
}

auto results = cudf::make_timestamp_column(input.type(),
input.size(),
cudf::detail::copy_bitmask(input, stream, mr),
input.null_count(),
stream,
mr);

thrust::transform(
rmm::exec_policy(stream),
input.begin<timestamp_type>(),
input.end<timestamp_type>(),
results->mutable_view().begin<timestamp_type>(),
time_add_functor<timestamp_type>{fixed_transitions, tz_index, duration.value()});

return results;
}

template <typename timestamp_type>
auto time_add_with_tz(column_view const& input,
column_view const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// get the fixed transitions
auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream);
auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr};

auto [null_mask, null_count] =
cudf::detail::bitmask_and(cudf::table_view{{input, duration}}, stream, mr);

auto results = cudf::make_timestamp_column(
input.type(), input.size(), rmm::device_buffer(null_mask, stream), null_count, stream, mr);

thrust::transform(rmm::exec_policy(stream),
input.begin<timestamp_type>(),
input.end<timestamp_type>(),
duration.begin<int64_t>(),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
results->mutable_view().begin<timestamp_type>(),
time_add_functor<timestamp_type>{fixed_transitions, tz_index, 0L});

return results;
}

} // namespace

namespace spark_rapids_jni {
Expand Down Expand Up @@ -163,4 +342,50 @@ std::unique_ptr<column> convert_utc_timestamp_to_timezone(column_view const& inp
return convert_timestamp(input, transitions, tz_index, false, stream, mr);
}

std::unique_ptr<column> time_add(column_view const& input,
scalar_i64 const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const type = input.type().id();

switch (type) {
case cudf::type_id::TIMESTAMP_SECONDS:
revans2 marked this conversation as resolved.
Show resolved Hide resolved
return time_add_with_tz<cudf::timestamp_s>(
input, duration, transitions, tz_index, stream, mr);
case cudf::type_id::TIMESTAMP_MILLISECONDS:
return time_add_with_tz<cudf::timestamp_ms>(
input, duration, transitions, tz_index, stream, mr);
case cudf::type_id::TIMESTAMP_MICROSECONDS:
return time_add_with_tz<cudf::timestamp_us>(
input, duration, transitions, tz_index, stream, mr);
default: CUDF_FAIL("Unsupported timestamp unit for timezone conversion");
}
}

std::unique_ptr<column> time_add(column_view const& input,
column_view const& duration,
revans2 marked this conversation as resolved.
Show resolved Hide resolved
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const type = input.type().id();

switch (type) {
case cudf::type_id::TIMESTAMP_SECONDS:
revans2 marked this conversation as resolved.
Show resolved Hide resolved
return time_add_with_tz<cudf::timestamp_s>(
input, duration, transitions, tz_index, stream, mr);
case cudf::type_id::TIMESTAMP_MILLISECONDS:
return time_add_with_tz<cudf::timestamp_ms>(
input, duration, transitions, tz_index, stream, mr);
case cudf::type_id::TIMESTAMP_MICROSECONDS:
return time_add_with_tz<cudf::timestamp_us>(
input, duration, transitions, tz_index, stream, mr);
default: CUDF_FAIL("Unsupported timestamp unit for timezone conversion");
}
}

} // namespace spark_rapids_jni
31 changes: 30 additions & 1 deletion src/main/cpp/src/timezones.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,33 @@ std::unique_ptr<cudf::column> convert_utc_timestamp_to_timezone(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
/**
* @brief Add intervals to a column of timestamps.
*
* The transition rules are in enclosed in a table, and the index corresponding to the
* specific timezone is given.
*
* @param input the column of input timestamps in UTC
* @param intervals the column of intervals to add to the input timestamps
* @param transitions the table of transitions for all timezones
* @param tz_index the index of the row in `transitions` corresponding to the specific timezone
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned timestamp column's memory
*/
std::unique_ptr<cudf::column> time_add(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: duration parameter has 2 types: numeric_scalar and column_view.
Extract a template for duration parameter to combine the two time_add functions to one function.

cudf::column_view const& input,
cudf::numeric_scalar<int64_t> const& duration,
cudf::table_view const& transitions,
cudf::size_type tz_index,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

std::unique_ptr<cudf::column> time_add(
cudf::column_view const& input,
cudf::column_view const& intervals,
cudf::table_view const& transitions,
cudf::size_type tz_index,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
Loading