Skip to content

Commit

Permalink
conolidate to_date and to_timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 28, 2024
1 parent 70b1ce4 commit a3b9d36
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 212 deletions.
119 changes: 117 additions & 2 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,37 @@
// specific language governing permissions and limitations
// under the License.

//! This file contains several examples of how to run queries using DataFusion's
//! DataFrame API:
//!
//! * [`parquet`]: query a single Parquet file
//! * [`to_date_demo`]: use the `to_date` function to convert dates to strings

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use arrow::array::{RecordBatch, StringArray};
use tempfile::tempdir;
use datafusion_common::assert_contains;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
parquet().await?;
to_date_demo().await?;
to_timestamp_demo().await?;

Ok(())
}


/// This example demonstrates executing a simple query against an Arrow data
/// source (Parquet) and fetching results, using the DataFrame trait
async fn parquet() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

Expand Down Expand Up @@ -109,3 +129,98 @@ async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
// Register a lazy DataFrame by using the context and option provider
ctx.read_csv(file_path, csv_read_option).await.unwrap()
}


/// This example demonstrates how to use the to_date series
/// of functions in the DataFrame API
async fn to_date_demo() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
]))],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_date function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_date(vec![col("a")]))?;

let df = df.select_columns(&["a"])?;

// print the results
df.show().await?;

Ok(())
}



/// This example demonstrates how to use the to_timestamp series
/// of functions in the DataFrame API
async fn to_timestamp_demo() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
])),
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"08-09-2020 13/42/29",
"09-27-2020 13:42:29-05:30",
])),
],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_timestamp function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_timestamp(vec![col("a")]))?;
// use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list
// of chrono formats (https://docs.rs/chrono/latest/chrono/format/strftime/index.html) to try
let df = df.with_column(
"b",
to_timestamp_seconds(vec![
col("b"),
lit("%+"),
lit("%d-%m-%Y %H/%M/%S"),
lit("%m-%d-%Y %H:%M:%S%#z"),
]),
)?;

let df = df.select_columns(&["a", "b"])?;

// print the results
df.show().await?;

Ok(())
}
119 changes: 107 additions & 12 deletions datafusion-examples/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

//! This file contains several examples of how to run SQL queries using DataFusion
//!
//! * [`parquet`]: run SQL query against a single Parquet file
//! * [`parquet_multi_files`]: run SQL query against a table backed by multiple Parquet files
//! * [`regexp`]: regular expression functions to manipulate strings
//! * [`to_char`]: to_char function to convert strings to date, time, timestamp and durations
//! * [`parquet_demo`]: run SQL query against a single Parquet file
//! * [`parquet_multi_files_demo`]: run SQL query against a table backed by multiple Parquet files
//! * [`regexp_demo`]: regular expression functions to manipulate strings
//! * [`to_char_demo`]: to_char function to convert strings to date, time, timestamp and durations
//! * [`to_timestamp_demo`]: to_timestamp function to convert strings to timestamps
use arrow::array::{Date32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
Expand All @@ -35,16 +36,17 @@ use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
parquet().await?;
parquet_multi_files().await?;
regexp().await?;
to_char().await?;
parquet_demo().await?;
parquet_multi_files_demo().await?;
regexp_demo().await?;
to_char_demo().await?;
to_timestamp_demo().await?;
Ok(())
}

/// This example demonstrates executing a simple query against an Arrow data
/// source (Parquet) and fetching results
async fn parquet() -> Result<()> {
async fn parquet_demo() -> Result<()> {
// create local session context
let ctx = SessionContext::new();

Expand Down Expand Up @@ -79,7 +81,7 @@ async fn parquet() -> Result<()> {
/// The query is run twice, once showing how to used `register_listing_table`
/// with an absolute path, and once registering an ObjectStore to use a relative
/// path.
async fn parquet_multi_files() -> Result<()> {
async fn parquet_multi_files_demo() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

Expand Down Expand Up @@ -167,7 +169,7 @@ async fn parquet_multi_files() -> Result<()> {
///
/// Supported flags can be found at
/// https://docs.rs/regex/latest/regex/#grouping-and-flags
async fn regexp() -> Result<()> {
async fn regexp_demo() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"examples",
Expand Down Expand Up @@ -486,7 +488,7 @@ async fn regexp() -> Result<()> {
///
/// This function accepts date, time, timestamp and duration values
/// in the first argument and string values for the second
async fn to_char() -> Result<()> {
async fn to_char_demo() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("values", DataType::Date32, false),
Field::new("patterns", DataType::Utf8, false),
Expand Down Expand Up @@ -671,3 +673,96 @@ async fn to_char() -> Result<()> {

Ok(())
}


/// This example demonstrates how to use the to_timestamp series
/// of functions in the DataFrame API as well as via sql.
async fn to_timestamp_demo() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
])),
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"08-09-2020 13/42/29",
"09-27-2020 13:42:29-05:30",
])),
],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
// use sql to convert col 'a' to timestamp using the default parsing
let df = ctx.sql("select to_timestamp(a) from t").await?;

// print the results
df.show().await?;

// use sql to convert col 'b' to timestamp using a list of chrono formats to try
let df = ctx.sql("select to_timestamp(b, '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z') from t").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a list of chrono formats to try
// note that one of the formats is invalid ('%q') but since DataFusion will try all the
// formats until it encounters one that parses the timestamp expression successfully
// no error will be returned
let df = ctx.sql("select to_timestamp_micros('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z')").await?;

// print the results
df.show().await?;

// casting a string to TIMESTAMP will also work for RFC3339 timestamps
let df = ctx
.sql("select to_timestamp_millis(TIMESTAMP '2022-08-03T14:38:50Z')")
.await?;

// print the results
df.show().await?;

// unix timestamps (in seconds) are also supported
let df = ctx.sql("select to_timestamp(1926632005)").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a non-matching chrono format to try
let result = ctx
.sql("select to_timestamp_nanos('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')")
.await?
.collect()
.await;

let expected = "Execution error: Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input is out of range";
assert_contains!(result.unwrap_err().to_string(), expected);

// note that using arrays for the chrono formats is not supported
let result = ctx
.sql("SELECT to_timestamp('2022-08-03T14:38:50+05:30', make_array('%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+'))")
.await?
.collect()
.await;

let expected = "to_timestamp function unsupported data type at index 1: List";
assert_contains!(result.unwrap_err().to_string(), expected);

Ok(())
}

60 changes: 0 additions & 60 deletions datafusion-examples/examples/to_date.rs

This file was deleted.

Loading

0 comments on commit a3b9d36

Please sign in to comment.