diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 27d783cd89b5..6271d8af3786 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -48,6 +48,7 @@ use datafusion_common::{ DEFAULT_PARQUET_EXTENSION, }; use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -749,9 +750,13 @@ impl DataSink for ParquetSink { parquet_props.writer_options().clone(), ) .await?; + let mut reservation = + MemoryConsumer::new(format!("ParquetSink[{}]", path)) + .register(context.memory_pool()); file_write_tasks.spawn(async move { while let Some(batch) = rx.recv().await { writer.write(&batch).await?; + reservation.try_resize(writer.memory_size())?; } let file_metadata = writer .close() @@ -771,6 +776,7 @@ impl DataSink for ParquetSink { let schema = self.get_writer_schema(); let props = parquet_props.clone(); let parallel_options_clone = parallel_options.clone(); + let pool = Arc::clone(context.memory_pool()); file_write_tasks.spawn(async move { let file_metadata = output_single_parquet_file_parallelized( writer, @@ -778,6 +784,7 @@ impl DataSink for ParquetSink { schema, props.writer_options(), parallel_options_clone, + pool, ) .await?; Ok((path, file_metadata)) @@ -818,14 +825,16 @@ impl DataSink for ParquetSink { async fn column_serializer_task( mut rx: Receiver, mut writer: ArrowColumnWriter, -) -> Result { + mut reservation: MemoryReservation, +) -> Result<(ArrowColumnWriter, MemoryReservation)> { while let Some(col) = rx.recv().await { writer.write(&col)?; + reservation.try_resize(writer.memory_size())?; } - Ok(writer) + Ok((writer, reservation)) } -type ColumnWriterTask = SpawnedTask>; +type ColumnWriterTask = SpawnedTask>; type ColSender = Sender; /// Spawns a parallel serialization task for each column @@ -835,6 +844,7 @@ fn spawn_column_parallel_row_group_writer( schema: Arc, parquet_props: Arc, max_buffer_size: usize, + pool: &Arc, ) -> Result<(Vec, Vec)> { let schema_desc = arrow_to_parquet_schema(&schema)?; let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; @@ -848,7 +858,13 @@ fn spawn_column_parallel_row_group_writer( mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); - let task = SpawnedTask::spawn(column_serializer_task(recieve_array, writer)); + let reservation = + MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool); + let task = SpawnedTask::spawn(column_serializer_task( + recieve_array, + writer, + reservation, + )); col_writer_tasks.push(task); } @@ -864,7 +880,7 @@ struct ParallelParquetWriterOptions { /// This is the return type of calling [ArrowColumnWriter].close() on each column /// i.e. the Vec of encoded columns which can be appended to a row group -type RBStreamSerializeResult = Result<(Vec, usize)>; +type RBStreamSerializeResult = Result<(Vec, MemoryReservation, usize)>; /// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective /// parallel column serializers. @@ -877,12 +893,12 @@ async fn send_arrays_to_col_writers( let mut next_channel = 0; for (array, field) in rb.columns().iter().zip(schema.fields()) { for c in compute_leaves(field, array)? { - col_array_channels[next_channel] - .send(c) - .await - .map_err(|_| { - DataFusionError::Internal("Unable to send array to writer!".into()) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if col_array_channels[next_channel].send(c).await.is_err() { + return Ok(()); + } + next_channel += 1; } } @@ -895,16 +911,22 @@ async fn send_arrays_to_col_writers( fn spawn_rg_join_and_finalize_task( column_writer_tasks: Vec, rg_rows: usize, + pool: &Arc, ) -> SpawnedTask { + let mut rg_reservation = + MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool); + SpawnedTask::spawn(async move { let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); for task in column_writer_tasks.into_iter() { - let writer = task.join_unwind().await?; + let (writer, _col_reservation) = task.join_unwind().await?; + let encoded_size = writer.get_estimated_total_bytes(); + rg_reservation.grow(encoded_size); finalized_rg.push(writer.close()?); } - Ok((finalized_rg, rg_rows)) + Ok((finalized_rg, rg_reservation, rg_rows)) }) } @@ -922,6 +944,7 @@ fn spawn_parquet_parallel_serialization_task( schema: Arc, writer_props: Arc, parallel_options: ParallelParquetWriterOptions, + pool: Arc, ) -> SpawnedTask> { SpawnedTask::spawn(async move { let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; @@ -931,6 +954,7 @@ fn spawn_parquet_parallel_serialization_task( schema.clone(), writer_props.clone(), max_buffer_rb, + &pool, )?; let mut current_rg_rows = 0; @@ -957,13 +981,14 @@ fn spawn_parquet_parallel_serialization_task( let finalize_rg_task = spawn_rg_join_and_finalize_task( column_writer_handles, max_row_group_rows, + &pool, ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); @@ -973,6 +998,7 @@ fn spawn_parquet_parallel_serialization_task( schema.clone(), writer_props.clone(), max_buffer_rb, + &pool, )?; } } @@ -981,14 +1007,17 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows if current_rg_rows > 0 { - let finalize_rg_task = - spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); + let finalize_rg_task = spawn_rg_join_and_finalize_task( + column_writer_handles, + current_rg_rows, + &pool, + ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } } Ok(()) @@ -1002,9 +1031,13 @@ async fn concatenate_parallel_row_groups( schema: Arc, writer_props: Arc, mut object_store_writer: Box, + pool: Arc, ) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); + let mut file_reservation = + MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); + let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; let mut parquet_writer = SerializedFileWriter::new( merged_buff.clone(), @@ -1015,15 +1048,20 @@ async fn concatenate_parallel_row_groups( while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; - let (serialized_columns, _cnt) = result?; + let (serialized_columns, mut rg_reservation, _cnt) = result?; for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; + rg_reservation.free(); + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + file_reservation.try_resize(buff_to_flush.len())?; + if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer .write_all(buff_to_flush.as_slice()) .await?; buff_to_flush.clear(); + file_reservation.try_resize(buff_to_flush.len())?; // will set to zero } } rg_out.close()?; @@ -1034,6 +1072,7 @@ async fn concatenate_parallel_row_groups( object_store_writer.write_all(final_buff.as_slice()).await?; object_store_writer.shutdown().await?; + file_reservation.free(); Ok(file_metadata) } @@ -1048,6 +1087,7 @@ async fn output_single_parquet_file_parallelized( output_schema: Arc, parquet_props: &WriterProperties, parallel_options: ParallelParquetWriterOptions, + pool: Arc, ) -> Result { let max_rowgroups = parallel_options.max_parallel_row_groups; // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel @@ -1061,12 +1101,14 @@ async fn output_single_parquet_file_parallelized( output_schema.clone(), arc_props.clone(), parallel_options, + Arc::clone(&pool), ); let file_metadata = concatenate_parallel_row_groups( serialize_rx, output_schema.clone(), arc_props.clone(), object_store_writer, + pool, ) .await?; @@ -1158,8 +1200,10 @@ mod tests { use super::super::test_util::scan_format; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; use crate::physical_plan::collect; + use crate::test_util::bounded_stream; use std::fmt::{Display, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; use super::*; @@ -2177,4 +2221,105 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn parquet_sink_write_memory_reservation() -> Result<()> { + async fn test_memory_reservation(global: ParquetOptions) -> Result<()> { + let field_a = Field::new("a", DataType::Utf8, false); + let field_b = Field::new("b", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let object_store_url = ObjectStoreUrl::local_filesystem(); + + let file_sink_config = FileSinkConfig { + object_store_url: object_store_url.clone(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![], + overwrite: true, + keep_partition_by_columns: false, + }; + let parquet_sink = Arc::new(ParquetSink::new( + file_sink_config, + TableParquetOptions { + key_value_metadata: std::collections::HashMap::from([ + ("my-data".to_string(), Some("stuff".to_string())), + ("my-data-bool-key".to_string(), None), + ]), + global, + ..Default::default() + }, + )); + + // create data + let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); + let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); + let batch = + RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + + // create task context + let task_context = build_ctx(object_store_url.as_ref()); + assert_eq!( + task_context.memory_pool().reserved(), + 0, + "no bytes are reserved yet" + ); + + let mut write_task = parquet_sink.write_all( + Box::pin(RecordBatchStreamAdapter::new( + schema, + bounded_stream(batch, 1000), + )), + &task_context, + ); + + // incrementally poll and check for memory reservation + let mut reserved_bytes = 0; + while futures::poll!(&mut write_task).is_pending() { + reserved_bytes += task_context.memory_pool().reserved(); + tokio::time::sleep(Duration::from_micros(1)).await; + } + assert!( + reserved_bytes > 0, + "should have bytes reserved during write" + ); + assert_eq!( + task_context.memory_pool().reserved(), + 0, + "no leaking byte reservation" + ); + + Ok(()) + } + + let write_opts = ParquetOptions { + allow_single_file_parallelism: false, + ..Default::default() + }; + test_memory_reservation(write_opts) + .await + .expect("should track for non-parallel writes"); + + let row_parallel_write_opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 10, + maximum_buffered_record_batches_per_stream: 1, + ..Default::default() + }; + test_memory_reservation(row_parallel_write_opts) + .await + .expect("should track for row-parallel writes"); + + let col_parallel_write_opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 1, + maximum_buffered_record_batches_per_stream: 2, + ..Default::default() + }; + test_memory_reservation(col_parallel_write_opts) + .await + .expect("should track for column-parallel writes"); + + Ok(()) + } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 059fa8fc6da7..ba0509f3f51a 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -366,3 +366,39 @@ pub fn register_unbounded_file_with_ordering( ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; Ok(()) } + +struct BoundedStream { + limit: usize, + count: usize, + batch: RecordBatch, +} + +impl Stream for BoundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if self.count >= self.limit { + return Poll::Ready(None); + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } +} + +impl RecordBatchStream for BoundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } +} + +/// Creates an bounded stream for testing purposes. +pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchStream { + Box::pin(BoundedStream { + count: 0, + limit, + batch, + }) +} diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index f61ee5d9ab98..7ef24609e238 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,6 +31,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use futures::StreamExt; use std::any::Any; use std::sync::{Arc, OnceLock}; +use tokio::fs::File; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; @@ -323,6 +324,30 @@ async fn oom_recursive_cte() { .await } +#[tokio::test] +async fn oom_parquet_sink() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.into_path().join("test.parquet"); + let _ = File::create(path.clone()).await.unwrap(); + + TestCase::new() + .with_query(format!( + " + COPY (select * from t) + TO '{}' + STORED AS PARQUET OPTIONS (compression 'uncompressed'); + ", + path.to_string_lossy() + )) + .with_expected_errors(vec![ + "Failed to allocate additional", + "for ParquetSink(ArrowColumnWriter)", + ]) + .with_memory_limit(200_000) + .run() + .await +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 9c2f80856bf8..a7e9827d6ca6 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -32,6 +32,7 @@ pub mod make_date; pub mod now; pub mod to_char; pub mod to_date; +pub mod to_local_time; pub mod to_timestamp; pub mod to_unixtime; @@ -50,6 +51,7 @@ make_udf_function!( make_udf_function!(now::NowFunc, NOW, now); make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char); make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date); +make_udf_function!(to_local_time::ToLocalTimeFunc, TO_LOCAL_TIME, to_local_time); make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime); make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp); make_udf_function!( @@ -108,7 +110,13 @@ pub mod expr_fn { ),( now, "returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement", - ),( + ), + ( + to_local_time, + "converts a timezone-aware timestamp to local time (with no offset or timezone information), i.e. strips off the timezone from the timestamp", + args, + ), + ( to_unixtime, "converts a string and optional formats to a Unixtime", args, @@ -277,6 +285,7 @@ pub fn functions() -> Vec> { now(), to_char(), to_date(), + to_local_time(), to_unixtime(), to_timestamp(), to_timestamp_seconds(), diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs new file mode 100644 index 000000000000..c84d1015bd7e --- /dev/null +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -0,0 +1,564 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Add; +use std::sync::Arc; + +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, PrimitiveBuilder}; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::{ + ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; +use arrow::datatypes::{ + TimeUnit, + TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}, +}; + +use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; +use datafusion_common::cast::as_primitive_array; +use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::TypeSignature::Exact; +use datafusion_expr::{ + ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, +}; + +/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or +/// timezone information). In other words, this function strips off the timezone from the timestamp, +/// while keep the display value of the timestamp the same. +#[derive(Debug)] +pub struct ToLocalTimeFunc { + signature: Signature, +} + +impl Default for ToLocalTimeFunc { + fn default() -> Self { + Self::new() + } +} + +impl ToLocalTimeFunc { + pub fn new() -> Self { + let base_sig = |array_type: TimeUnit| { + [ + Exact(vec![Timestamp(array_type, None)]), + Exact(vec![Timestamp(array_type, Some(TIMEZONE_WILDCARD.into()))]), + ] + }; + + let full_sig = [Nanosecond, Microsecond, Millisecond, Second] + .into_iter() + .flat_map(base_sig) + .collect::>(); + + Self { + signature: Signature::one_of(full_sig, Volatility::Immutable), + } + } + + fn to_local_time(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!( + "to_local_time function requires 1 argument, got {}", + args.len() + ); + } + + let time_value = &args[0]; + let arg_type = time_value.data_type(); + match arg_type { + DataType::Timestamp(_, None) => { + // if no timezone specificed, just return the input + Ok(time_value.clone()) + } + // If has timezone, adjust the underlying time value. The current time value + // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore, + // we need to adjust the time value to the local time. See [`adjust_to_local_time`] + // for more details. + // + // Then remove the timezone in return type, i.e. return None + DataType::Timestamp(_, Some(timezone)) => { + let tz: Tz = timezone.parse()?; + + match time_value { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(ts), + Some(_), + )) => { + let adjusted_ts = + adjust_to_local_time::(*ts, tz)?; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(ts), + Some(_), + )) => { + let adjusted_ts = + adjust_to_local_time::(*ts, tz)?; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(ts), + Some(_), + )) => { + let adjusted_ts = + adjust_to_local_time::(*ts, tz)?; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(ts), + Some(_), + )) => { + let adjusted_ts = + adjust_to_local_time::(*ts, tz)?; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Array(array) => { + fn transform_array( + array: &ArrayRef, + tz: Tz, + ) -> Result { + let mut builder = PrimitiveBuilder::::new(); + + let primitive_array = as_primitive_array::(array)?; + for ts_opt in primitive_array.iter() { + match ts_opt { + None => builder.append_null(), + Some(ts) => { + let adjusted_ts: i64 = + adjust_to_local_time::(ts, tz)?; + builder.append_value(adjusted_ts) + } + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + + match array.data_type() { + Timestamp(_, None) => { + // if no timezone specificed, just return the input + Ok(time_value.clone()) + } + Timestamp(Nanosecond, Some(_)) => { + transform_array::(array, tz) + } + Timestamp(Microsecond, Some(_)) => { + transform_array::(array, tz) + } + Timestamp(Millisecond, Some(_)) => { + transform_array::(array, tz) + } + Timestamp(Second, Some(_)) => { + transform_array::(array, tz) + } + _ => { + exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type()) + } + } + } + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + time_value.data_type() + ) + } + } + } + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + arg_type + ) + } + } + } +} + +/// This function converts a timestamp with a timezone to a timestamp without a timezone. +/// The display value of the adjusted timestamp remain the same, but the underlying timestamp +/// representation is adjusted according to the relative timezone offset to UTC. +/// +/// This function uses chrono to handle daylight saving time changes. +/// +/// For example, +/// +/// ```text +/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels' +/// ``` +/// +/// is displayed as follows in datafusion-cli: +/// +/// ```text +/// 2019-03-31T01:00:00+01:00 +/// ``` +/// +/// and is represented in DataFusion as: +/// +/// ```text +/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels")) +/// ``` +/// +/// To strip off the timezone while keeping the display value the same, we need to +/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()` +/// +/// ```text +/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000 +/// ``` +/// +/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is +/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone +/// offset for "Europe/Brussels" for this date. +/// +/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For +/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset +/// when DST ends. +/// +/// Consequently, DataFusion can represent the timestamp in local time (with no offset or +/// timezone information) as +/// +/// ```text +/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None) +/// ``` +/// +/// which is displayed as follows in datafusion-cli: +/// +/// ```text +/// 2019-03-31T01:00:00 +/// ``` +/// +/// See `test_adjust_to_local_time()` for example +fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { + fn convert_timestamp(ts: i64, converter: F) -> Result> + where + F: Fn(i64) -> MappedLocalTime>, + { + match converter(ts) { + MappedLocalTime::Ambiguous(earliest, latest) => exec_err!( + "Ambiguous timestamp. Do you mean {:?} or {:?}", + earliest, + latest + ), + MappedLocalTime::None => exec_err!( + "The local time does not exist because there is a gap in the local time." + ), + MappedLocalTime::Single(date_time) => Ok(date_time), + } + } + + let date_time = match T::UNIT { + Nanosecond => Utc.timestamp_nanos(ts), + Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?, + Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?, + Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?, + }; + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time.naive_utc()) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = date_time.add( + // This should not fail under normal circumstances as the + // maximum possible offset is 26 hours (93,600 seconds) + TimeDelta::try_seconds(offset_seconds) + .ok_or(DataFusionError::Internal("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000".to_string()))?, + ); + + // convert the naive datetime back to i64 + match T::UNIT { + Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or( + DataFusionError::Internal( + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807".to_string(), + ), + ), + Microsecond => Ok(adjusted_date_time.timestamp_micros()), + Millisecond => Ok(adjusted_date_time.timestamp_millis()), + Second => Ok(adjusted_date_time.timestamp()), + } +} + +impl ScalarUDFImpl for ToLocalTimeFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_local_time" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.len() != 1 { + return exec_err!( + "to_local_time function requires 1 argument, got {:?}", + arg_types.len() + ); + } + + match &arg_types[0] { + Timestamp(Nanosecond, _) => Ok(Timestamp(Nanosecond, None)), + Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)), + Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)), + Timestamp(Second, _) => Ok(Timestamp(Second, None)), + _ => exec_err!( + "The to_local_time function can only accept timestamp as the arg, got {:?}", arg_types[0] + ), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!( + "to_local_time function requires 1 argument, got {:?}", + args.len() + ); + } + + self.to_local_time(args) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{types::TimestampNanosecondType, TimestampNanosecondArray}; + use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; + use arrow::datatypes::{DataType, TimeUnit}; + use chrono::NaiveDateTime; + use datafusion_common::ScalarValue; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + + use super::{adjust_to_local_time, ToLocalTimeFunc}; + + #[test] + fn test_adjust_to_local_time() { + let timestamp_str = "2020-03-31T13:40:00"; + let tz: arrow::array::timezone::Tz = + "America/New_York".parse().expect("Invalid timezone"); + + let timestamp = timestamp_str + .parse::() + .unwrap() + .and_local_timezone(tz) // this is in a local timezone + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + + let expected_timestamp = timestamp_str + .parse::() + .unwrap() + .and_utc() // this is in UTC + .timestamp_nanos_opt() + .unwrap(); + + let res = adjust_to_local_time::(timestamp, tz).unwrap(); + assert_eq!(res, expected_timestamp); + } + + #[test] + fn test_to_local_time_scalar() { + let timezone = Some("Europe/Brussels".into()); + let timestamps_with_timezone = vec![ + ( + ScalarValue::TimestampNanosecond( + Some(1_123_123_000_000_000_000), + timezone.clone(), + ), + ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None), + ), + ( + ScalarValue::TimestampMicrosecond( + Some(1_123_123_000_000_000), + timezone.clone(), + ), + ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None), + ), + ( + ScalarValue::TimestampMillisecond( + Some(1_123_123_000_000), + timezone.clone(), + ), + ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None), + ), + ( + ScalarValue::TimestampSecond(Some(1_123_123_000), timezone), + ScalarValue::TimestampSecond(Some(1_123_130_200), None), + ), + ]; + + for (input, expected) in timestamps_with_timezone { + test_to_local_time_helper(input, expected); + } + } + + #[test] + fn test_timezone_with_daylight_savings() { + let timezone_str = "America/New_York"; + let tz: arrow::array::timezone::Tz = + timezone_str.parse().expect("Invalid timezone"); + + // Test data: + // ( + // the string display of the input timestamp, + // the i64 representation of the timestamp before adjustment in nanosecond, + // the i64 representation of the timestamp after adjustment in nanosecond, + // ) + let test_cases = vec![ + ( + // DST time + "2020-03-31T13:40:00", + 1_585_676_400_000_000_000, + 1_585_662_000_000_000_000, + ), + ( + // End of DST + "2020-11-04T14:06:40", + 1_604_516_800_000_000_000, + 1_604_498_800_000_000_000, + ), + ]; + + for ( + input_timestamp_str, + expected_input_timestamp, + expected_adjusted_timestamp, + ) in test_cases + { + let input_timestamp = input_timestamp_str + .parse::() + .unwrap() + .and_local_timezone(tz) // this is in a local timezone + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(input_timestamp, expected_input_timestamp); + + let expected_timestamp = input_timestamp_str + .parse::() + .unwrap() + .and_utc() // this is in UTC + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(expected_timestamp, expected_adjusted_timestamp); + + let input = ScalarValue::TimestampNanosecond( + Some(input_timestamp), + Some(timezone_str.into()), + ); + let expected = + ScalarValue::TimestampNanosecond(Some(expected_timestamp), None); + test_to_local_time_helper(input, expected) + } + } + + fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) { + let res = ToLocalTimeFunc::new() + .invoke(&[ColumnarValue::Scalar(input)]) + .unwrap(); + match res { + ColumnarValue::Scalar(res) => { + assert_eq!(res, expected); + } + _ => panic!("unexpected return type"), + } + } + + #[test] + fn test_to_local_time_timezones_array() { + let cases = [ + ( + vec![ + "2020-09-08T00:00:00", + "2020-09-08T01:00:00", + "2020-09-08T02:00:00", + "2020-09-08T03:00:00", + "2020-09-08T04:00:00", + ], + None::>, + vec![ + "2020-09-08T00:00:00", + "2020-09-08T01:00:00", + "2020-09-08T02:00:00", + "2020-09-08T03:00:00", + "2020-09-08T04:00:00", + ], + ), + ( + vec![ + "2020-09-08T00:00:00", + "2020-09-08T01:00:00", + "2020-09-08T02:00:00", + "2020-09-08T03:00:00", + "2020-09-08T04:00:00", + ], + Some("+01:00".into()), + vec![ + "2020-09-08T00:00:00", + "2020-09-08T01:00:00", + "2020-09-08T02:00:00", + "2020-09-08T03:00:00", + "2020-09-08T04:00:00", + ], + ), + ]; + + cases.iter().for_each(|(source, _tz_opt, expected)| { + let input = source + .iter() + .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) + .collect::(); + let right = expected + .iter() + .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) + .collect::(); + let result = ToLocalTimeFunc::new() + .invoke(&[ColumnarValue::Array(Arc::new(input))]) + .unwrap(); + if let ColumnarValue::Array(result) = result { + assert_eq!( + result.data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + let left = arrow::array::cast::as_primitive_array::< + TimestampNanosecondType, + >(&result); + assert_eq!(left, &right); + } else { + panic!("unexpected column type"); + } + }); + } +} diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fa432ad76de5..664fc93a762a 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -424,8 +424,10 @@ fn push_down_all_join( } } + let mut on_filter_join_conditions = vec![]; + let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?; + if !on_filter.is_empty() { - let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?; for on in on_filter { if on_left_preserved && can_pushdown_join_predicate(&on, left_schema)? { left_push.push(on) @@ -434,7 +436,7 @@ fn push_down_all_join( { right_push.push(on) } else { - join_conditions.push(on) + on_filter_join_conditions.push(on) } } } @@ -450,6 +452,21 @@ fn push_down_all_join( right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema)); } + // For predicates from join filter, we should check with if a join side is preserved + // in term of join filtering. + if on_left_preserved { + left_push.extend(extract_or_clauses_for_join( + &on_filter_join_conditions, + left_schema, + )); + } + if on_right_preserved { + right_push.extend(extract_or_clauses_for_join( + &on_filter_join_conditions, + right_schema, + )); + } + if let Some(predicate) = conjunction(left_push) { join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?)); } @@ -459,6 +476,7 @@ fn push_down_all_join( } // Add any new join conditions as the non join predicates + join_conditions.extend(on_filter_join_conditions); join.filter = conjunction(join_conditions); // wrap the join on the filter whose predicates must be kept, if any diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 6732d3e9108b..3c89109145d7 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -793,3 +793,196 @@ DROP TABLE companies statement ok DROP TABLE leads + +#### +## Test ON clause predicates are not pushed past join for OUTER JOINs +#### + + +# create tables +statement ok +CREATE TABLE employees(emp_id INT, name VARCHAR); + +statement ok +CREATE TABLE department(emp_id INT, dept_name VARCHAR); + +statement ok +INSERT INTO employees (emp_id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol'); + +statement ok +INSERT INTO department (emp_id, dept_name) VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales'); + +# Can not push the ON filter below an OUTER JOIN +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +logical_plan +01)Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob") +02)--SubqueryAlias: e +03)----TableScan: employees projection=[emp_id, name] +04)--SubqueryAlias: d +05)----TableScan: department projection=[dept_name] +physical_plan +01)ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, dept_name@0 as dept_name] +02)--NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice OR name@0 = Bob +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + +# neither RIGHT OUTER JOIN +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM department AS d +RIGHT JOIN employees AS e +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + +# neither FULL OUTER JOIN +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM department AS d +FULL JOIN employees AS e +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees e +LEFT JOIN department d +ON (e.name = 'NotExist1' OR e.name = 'NotExist2'); +---- +1 Alice NULL +2 Bob NULL +3 Carol NULL + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees e +LEFT JOIN department d +ON (e.name = 'Alice' OR e.name = 'NotExist'); +---- +1 Alice HR +1 Alice Engineering +1 Alice Sales +2 Bob NULL +3 Carol NULL + +# Can push the ON filter below the JOIN for INNER JOIN (expect to see a filter below the join) +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +logical_plan +01)CrossJoin: +02)--SubqueryAlias: e +03)----Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob") +04)------TableScan: employees projection=[emp_id, name] +05)--SubqueryAlias: d +06)----TableScan: department projection=[dept_name] +physical_plan +01)CrossJoinExec +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----FilterExec: name@1 = Alice OR name@1 = Bob +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)--MemoryExec: partitions=1, partition_sizes=[1] + +# expect no row for Carol +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +1 Alice Engineering +1 Alice Sales +2 Bob HR +2 Bob Engineering +2 Bob Sales + +# OR conditions on Filter (not join filter) +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE (e.name = 'Alice' OR e.name = 'Carol'); +---- +1 Alice HR +3 Carol Engineering + +# Push down OR conditions on Filter through LEFT JOIN if possible +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol')); +---- +logical_plan +01)Filter: d.dept_name != Utf8("Engineering") AND e.name = Utf8("Alice") OR e.name != Utf8("Alice") AND e.name = Utf8("Carol") +02)--Projection: e.emp_id, e.name, d.dept_name +03)----Left Join: e.emp_id = d.emp_id +04)------SubqueryAlias: e +05)--------Filter: employees.name = Utf8("Alice") OR employees.name != Utf8("Alice") AND employees.name = Utf8("Carol") +06)----------TableScan: employees projection=[emp_id, name] +07)------SubqueryAlias: d +08)--------TableScan: department projection=[emp_id, dept_name] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: dept_name@2 != Engineering AND name@1 = Alice OR name@1 != Alice AND name@1 = Carol +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------HashJoinExec: mode=CollectLeft, join_type=Left, on=[(emp_id@0, emp_id@0)], projection=[emp_id@0, name@1, dept_name@3] +06)----------CoalesceBatchesExec: target_batch_size=8192 +07)------------FilterExec: name@1 = Alice OR name@1 != Alice AND name@1 = Carol +08)--------------MemoryExec: partitions=1, partition_sizes=[1] +09)----------MemoryExec: partitions=1, partition_sizes=[1] + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol')); +---- +1 Alice HR +3 Carol Engineering + +statement ok +DROP TABLE employees + +statement ok +DROP TABLE department diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 2216dbfa5fd5..f4e492649b9f 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2844,3 +2844,180 @@ select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("UTC"))') - query error select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("+00:00"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("+01:00"))'); + +########## +## Test to_local_time function +########## + +# invalid number of arguments -- no argument +statement error +select to_local_time(); + +# invalid number of arguments -- more than 1 argument +statement error +select to_local_time('2024-04-01T00:00:20Z'::timestamp, 'some string'); + +# invalid argument data type +statement error DataFusion error: Execution error: The to_local_time function can only accept timestamp as the arg, got Utf8 +select to_local_time('2024-04-01T00:00:20Z'); + +# invalid timezone +statement error DataFusion error: Arrow error: Parser error: Invalid timezone "Europe/timezone": failed to parse timezone +select to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/timezone'); + +# valid query +query P +select to_local_time('2024-04-01T00:00:20Z'::timestamp); +---- +2024-04-01T00:00:20 + +query P +select to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE '+05:00'); +---- +2024-04-01T00:00:20 + +query P +select to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels'); +---- +2024-04-01T00:00:20 + +query PTPT +select + time, + arrow_typeof(time) as type, + to_local_time(time) as to_local_time, + arrow_typeof(to_local_time(time)) as to_local_time_type +from ( + select '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' as time +); +---- +2024-04-01T00:00:20+02:00 Timestamp(Nanosecond, Some("Europe/Brussels")) 2024-04-01T00:00:20 Timestamp(Nanosecond, None) + +# use to_local_time() in date_bin() +query P +select date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')); +---- +2024-04-01T00:00:00 + +query P +select date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels'; +---- +2024-04-01T00:00:00+02:00 + +# test using to_local_time() on array values +statement ok +create table t AS +VALUES + ('2024-01-01T00:00:01Z'), + ('2024-02-01T00:00:01Z'), + ('2024-03-01T00:00:01Z'), + ('2024-04-01T00:00:01Z'), + ('2024-05-01T00:00:01Z'), + ('2024-06-01T00:00:01Z'), + ('2024-07-01T00:00:01Z'), + ('2024-08-01T00:00:01Z'), + ('2024-09-01T00:00:01Z'), + ('2024-10-01T00:00:01Z'), + ('2024-11-01T00:00:01Z'), + ('2024-12-01T00:00:01Z') +; + +statement ok +create view t_utc as +select column1::timestamp AT TIME ZONE 'UTC' as "column1" +from t; + +statement ok +create view t_timezone as +select column1::timestamp AT TIME ZONE 'Europe/Brussels' as "column1" +from t; + +query PPT +select column1, to_local_time(column1::timestamp), arrow_typeof(to_local_time(column1::timestamp)) from t_utc; +---- +2024-01-01T00:00:01Z 2024-01-01T00:00:01 Timestamp(Nanosecond, None) +2024-02-01T00:00:01Z 2024-02-01T00:00:01 Timestamp(Nanosecond, None) +2024-03-01T00:00:01Z 2024-03-01T00:00:01 Timestamp(Nanosecond, None) +2024-04-01T00:00:01Z 2024-04-01T00:00:01 Timestamp(Nanosecond, None) +2024-05-01T00:00:01Z 2024-05-01T00:00:01 Timestamp(Nanosecond, None) +2024-06-01T00:00:01Z 2024-06-01T00:00:01 Timestamp(Nanosecond, None) +2024-07-01T00:00:01Z 2024-07-01T00:00:01 Timestamp(Nanosecond, None) +2024-08-01T00:00:01Z 2024-08-01T00:00:01 Timestamp(Nanosecond, None) +2024-09-01T00:00:01Z 2024-09-01T00:00:01 Timestamp(Nanosecond, None) +2024-10-01T00:00:01Z 2024-10-01T00:00:01 Timestamp(Nanosecond, None) +2024-11-01T00:00:01Z 2024-11-01T00:00:01 Timestamp(Nanosecond, None) +2024-12-01T00:00:01Z 2024-12-01T00:00:01 Timestamp(Nanosecond, None) + +query PPT +select column1, to_local_time(column1), arrow_typeof(to_local_time(column1)) from t_utc; +---- +2024-01-01T00:00:01Z 2024-01-01T00:00:01 Timestamp(Nanosecond, None) +2024-02-01T00:00:01Z 2024-02-01T00:00:01 Timestamp(Nanosecond, None) +2024-03-01T00:00:01Z 2024-03-01T00:00:01 Timestamp(Nanosecond, None) +2024-04-01T00:00:01Z 2024-04-01T00:00:01 Timestamp(Nanosecond, None) +2024-05-01T00:00:01Z 2024-05-01T00:00:01 Timestamp(Nanosecond, None) +2024-06-01T00:00:01Z 2024-06-01T00:00:01 Timestamp(Nanosecond, None) +2024-07-01T00:00:01Z 2024-07-01T00:00:01 Timestamp(Nanosecond, None) +2024-08-01T00:00:01Z 2024-08-01T00:00:01 Timestamp(Nanosecond, None) +2024-09-01T00:00:01Z 2024-09-01T00:00:01 Timestamp(Nanosecond, None) +2024-10-01T00:00:01Z 2024-10-01T00:00:01 Timestamp(Nanosecond, None) +2024-11-01T00:00:01Z 2024-11-01T00:00:01 Timestamp(Nanosecond, None) +2024-12-01T00:00:01Z 2024-12-01T00:00:01 Timestamp(Nanosecond, None) + +query PPT +select column1, to_local_time(column1), arrow_typeof(to_local_time(column1)) from t_timezone; +---- +2024-01-01T00:00:01+01:00 2024-01-01T00:00:01 Timestamp(Nanosecond, None) +2024-02-01T00:00:01+01:00 2024-02-01T00:00:01 Timestamp(Nanosecond, None) +2024-03-01T00:00:01+01:00 2024-03-01T00:00:01 Timestamp(Nanosecond, None) +2024-04-01T00:00:01+02:00 2024-04-01T00:00:01 Timestamp(Nanosecond, None) +2024-05-01T00:00:01+02:00 2024-05-01T00:00:01 Timestamp(Nanosecond, None) +2024-06-01T00:00:01+02:00 2024-06-01T00:00:01 Timestamp(Nanosecond, None) +2024-07-01T00:00:01+02:00 2024-07-01T00:00:01 Timestamp(Nanosecond, None) +2024-08-01T00:00:01+02:00 2024-08-01T00:00:01 Timestamp(Nanosecond, None) +2024-09-01T00:00:01+02:00 2024-09-01T00:00:01 Timestamp(Nanosecond, None) +2024-10-01T00:00:01+02:00 2024-10-01T00:00:01 Timestamp(Nanosecond, None) +2024-11-01T00:00:01+01:00 2024-11-01T00:00:01 Timestamp(Nanosecond, None) +2024-12-01T00:00:01+01:00 2024-12-01T00:00:01 Timestamp(Nanosecond, None) + +# combine to_local_time() with date_bin() +query P +select date_bin(interval '1 day', to_local_time(column1)) AT TIME ZONE 'Europe/Brussels' as date_bin from t_utc; +---- +2024-01-01T00:00:00+01:00 +2024-02-01T00:00:00+01:00 +2024-03-01T00:00:00+01:00 +2024-04-01T00:00:00+02:00 +2024-05-01T00:00:00+02:00 +2024-06-01T00:00:00+02:00 +2024-07-01T00:00:00+02:00 +2024-08-01T00:00:00+02:00 +2024-09-01T00:00:00+02:00 +2024-10-01T00:00:00+02:00 +2024-11-01T00:00:00+01:00 +2024-12-01T00:00:00+01:00 + +query P +select date_bin(interval '1 day', to_local_time(column1)) AT TIME ZONE 'Europe/Brussels' as date_bin from t_timezone; +---- +2024-01-01T00:00:00+01:00 +2024-02-01T00:00:00+01:00 +2024-03-01T00:00:00+01:00 +2024-04-01T00:00:00+02:00 +2024-05-01T00:00:00+02:00 +2024-06-01T00:00:00+02:00 +2024-07-01T00:00:00+02:00 +2024-08-01T00:00:00+02:00 +2024-09-01T00:00:00+02:00 +2024-10-01T00:00:00+02:00 +2024-11-01T00:00:00+01:00 +2024-12-01T00:00:00+01:00 + +statement ok +drop table t; + +statement ok +drop view t_utc; + +statement ok +drop view t_timezone;