Skip to content

Commit

Permalink
Merge branch 'main' into refactor-treenode-rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Feb 5, 2024
2 parents 277cef7 + 840499f commit 2ede5e6
Show file tree
Hide file tree
Showing 37 changed files with 842 additions and 258 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "50.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "50.0.0", default-features = false }
arrow-schema = { version = "50.0.0", default-features = false }
arrow-string = { version = "50.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
Expand Down
21 changes: 11 additions & 10 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dirs = "4.0.0"
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.9.0", features = ["aws", "gcp"] }
object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
parquet = { version = "50.0.0", default-features = false }
regex = "1.8"
Expand Down
18 changes: 18 additions & 0 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};

use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -281,6 +282,11 @@ async fn create_external_table(
let builder = get_gcs_object_store_builder(url, cmd)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"http" | "https" => Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?,
) as Arc<dyn ObjectStore>,
_ => {
// for other types, try to get from the object_store_registry
ctx.runtime_env()
Expand Down Expand Up @@ -329,12 +335,24 @@ mod tests {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

// Ensure the URL is supported by the object store
ctx.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_http() -> Result<()> {
// Should be OK
let location = "http://example.com/file.parquet";
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_s3() -> Result<()> {
let access_key_id = "fake_access_key_id";
Expand Down
46 changes: 46 additions & 0 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{DFField, DFSchema};
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::physical_expr::execution_props::ExecutionProps;
Expand All @@ -29,6 +30,7 @@ use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use std::collections::HashMap;
use std::sync::Arc;

/// This example demonstrates the DataFusion [`Expr`] API.
Expand All @@ -45,6 +47,7 @@ use std::sync::Arc;
/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
/// 3. Simplify expressions: [`simplify_demo`]
/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
/// 5. Get the types of the expressions: [`expression_type_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
Expand All @@ -68,6 +71,9 @@ async fn main() -> Result<()> {
// See how to analyze ranges in expressions
range_analysis_demo()?;

// See how to determine the data types of expressions
expression_type_demo()?;

Ok(())
}

Expand Down Expand Up @@ -256,3 +262,43 @@ pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr

create_physical_expr(&expr, df_schema.as_ref(), &props)
}

/// This function shows how to use `Expr::get_type` to retrieve the DataType
/// of an expression
fn expression_type_demo() -> Result<()> {
let expr = col("c");

// To determine the DataType of an expression, DataFusion must know the
// types of the input expressions. You can provide this information using
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Utf8, true)],
HashMap::new(),
)
.unwrap();
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Int32, true)],
HashMap::new(),
)
.unwrap();
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));

// Get the type of an expression that adds 2 columns. Adding an Int32
// and Float32 results in Float32 type
let expr = col("c1") + col("c2");
let schema = DFSchema::new_with_metadata(
vec![
DFField::new_unqualified("c1", DataType::Int32, true),
DFField::new_unqualified("c2", DataType::Float32, true),
],
HashMap::new(),
)
.unwrap();
assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));

Ok(())
}
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result<String> {

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
Expand Down
11 changes: 7 additions & 4 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use arrow::array::ArrayRef;
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use std::fmt::Debug;

/// Describes an aggregate functions's state.
/// Tracks an aggregate function's state.
///
/// `Accumulator`s are stateful objects that live throughout the
/// evaluation of multiple rows and aggregate multiple values together
/// into a final output aggregate.
/// `Accumulator`s are stateful objects that implement a single group. They
/// aggregate values from multiple rows together into a final output aggregate.
///
/// [`GroupsAccumulator]` is an additional more performant (but also complex) API
/// that manages state for multiple groups at once.
///
/// An accumulator knows how to:
/// * update its state from inputs via [`update_batch`]
Expand All @@ -40,6 +42,7 @@ use std::fmt::Debug;
/// [`state`] and combine the state from multiple accumulators'
/// via [`merge_batch`], as part of efficient multi-phase grouping.
///
/// [`GroupsAccumulator`]: crate::GroupsAccumulator
/// [`update_batch`]: Self::update_batch
/// [`retract_batch`]: Self::retract_batch
/// [`state`]: Self::state
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Chr => &["chr"],
BuiltinScalarFunction::EndsWith => &["ends_with"],
BuiltinScalarFunction::InitCap => &["initcap"],
BuiltinScalarFunction::InStr => &["instr"],
BuiltinScalarFunction::InStr => &["instr", "position"],
BuiltinScalarFunction::Left => &["left"],
BuiltinScalarFunction::Lower => &["lower"],
BuiltinScalarFunction::Lpad => &["lpad"],
Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub enum Expr {
/// A place holder which hold a reference to a qualified field
/// in the outer query, used for correlated sub queries.
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
}

/// Alias expression
Expand Down Expand Up @@ -917,6 +924,7 @@ impl Expr {
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
}
}

Expand Down Expand Up @@ -1308,6 +1316,7 @@ impl Expr {
| Expr::Negative(..)
| Expr::OuterReferenceColumn(_, _)
| Expr::TryCast(..)
| Expr::Unnest(..)
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
Expand Down Expand Up @@ -1562,6 +1571,9 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
}
}
}
}
Expand Down Expand Up @@ -1749,6 +1761,7 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")),
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression rewriter
use crate::expr::Alias;
use crate::expr::{Alias, Unnest};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
Expand Down Expand Up @@ -77,6 +77,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
}

expr.transform_up(&|expr| {
Ok({
if let Expr::Column(c) = expr {
Expand Down
35 changes: 34 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction,
ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
Expand Down Expand Up @@ -58,6 +58,31 @@ impl ExprSchemable for Expr {
///
/// Note: [DFSchema] implements [ExprSchema].
///
/// # Examples
///
/// ## Get the type of an expression that adds 2 columns. Adding an Int32
/// ## and Float32 results in Float32 type
///
/// ```
/// # use arrow::datatypes::DataType;
/// # use datafusion_common::{DFField, DFSchema};
/// # use datafusion_expr::{col, ExprSchemable};
/// # use std::collections::HashMap;
///
/// fn main() {
/// let expr = col("c1") + col("c2");
/// let schema = DFSchema::new_with_metadata(
/// vec![
/// DFField::new_unqualified("c1", DataType::Int32, true),
/// DFField::new_unqualified("c2", DataType::Float32, true),
/// ],
/// HashMap::new(),
/// )
/// .unwrap();
/// assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));
/// }
/// ```
///
/// # Errors
///
/// This function errors when it is not possible to compute its
Expand All @@ -82,6 +107,13 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { exprs }) => {
let arg_data_types = exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok(arg_data_types[0].clone())
}
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let arg_data_types = args
.iter()
Expand Down Expand Up @@ -250,6 +282,7 @@ impl ExprSchemable for Expr {
| Expr::ScalarFunction(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::Unnest(_)
| Expr::Placeholder(_) => Ok(true),
Expr::IsNull(_)
| Expr::IsNotNull(_)
Expand Down
Loading

0 comments on commit 2ede5e6

Please sign in to comment.