Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/PR_flow
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 22, 2024
2 parents 9002dea + 8cffb68 commit e70f029
Show file tree
Hide file tree
Showing 148 changed files with 5,486 additions and 2,463 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ version = "38.0.0"
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "51.0.0", default-features = false }
Expand Down Expand Up @@ -93,6 +96,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ enum ByteUnit {
}

impl ByteUnit {
fn multiplier(&self) -> usize {
fn multiplier(&self) -> u64 {
match self {
ByteUnit::Byte => 1,
ByteUnit::KiB => 1 << 10,
Expand Down Expand Up @@ -349,8 +349,12 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
let unit = byte_suffixes()
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
.ok()
.and_then(|multiplier| num.checked_mul(multiplier))
.ok_or_else(|| format!("Memory pool size '{}' is too large", size))?;

Ok(num * unit.multiplier())
Ok(memory_pool_size)
} else {
Err(format!("Invalid memory pool size '{}'", size))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cargo run --example csv_sql
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
28 changes: 12 additions & 16 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::{
arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
},
logical_expr::Volatility,
};
use std::any::Any;
use std::sync::Arc;

use arrow::array::{new_null_array, Array, AsArray};
use arrow::array::{
new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
};
use arrow::compute;
use arrow::datatypes::Float64Type;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
};
use std::sync::Arc;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
&self.aliases
}

fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok(Some(vec![Some(true)]))
fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
// The POW function preserves the order of its argument.
Ok(input[0].sort_properties)
}
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::result::Result as RResult;
use std::sync::Arc;

use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::ExprSimplifyResult;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature};
use std::result::Result as RResult;
use std::sync::Arc;

/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
Expand Down Expand Up @@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
fn output_ordering(&self, _input: &[ExprProperties]) -> Result<SortProperties> {
Ok(SortProperties::Unordered)
}
}

Expand Down
148 changes: 148 additions & 0 deletions datafusion-examples/examples/plan_to_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;

use datafusion::prelude::*;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_sql::unparser::dialect::CustomDialect;
use datafusion_sql::unparser::{plan_to_sql, Unparser};

/// This example demonstrates the programmatic construction of
/// SQL using the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API.
///
///
/// The code in this example shows how to:
/// 1. Create SQL from a variety of Expr and LogicalPlan: [`main`]`
/// 2. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql: [`simple_expr_to_sql_demo`]
/// 3. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql without escaping column names: [`simple_expr_to_sql_demo_no_escape`]
/// 4. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql escaping column names a MySQL style: [`simple_expr_to_sql_demo_escape_mysql_style`]
#[tokio::main]
async fn main() -> Result<()> {
// See how to evaluate expressions
simple_expr_to_sql_demo()?;
simple_expr_to_sql_demo_no_escape()?;
simple_expr_to_sql_demo_escape_mysql_style()?;
simple_plan_to_sql_parquest_dataframe_demo().await?;
round_trip_plan_to_sql_parquest_dataframe_demo().await?;
Ok(())
}

/// DataFusion can convert expressions to SQL, using column name escaping
/// PostgreSQL style.
fn simple_expr_to_sql_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let ast = expr_to_sql(&expr)?;
let sql = format!("{}", ast);
assert_eq!(sql, r#"(("a" < 5) OR ("a" = 8))"#);
Ok(())
}

/// DataFusion can convert expressions to SQL without escaping column names using
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_no_escape() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(None);
let unparser = Unparser::new(&dialect);
let ast = unparser.expr_to_sql(&expr)?;
let sql = format!("{}", ast);
assert_eq!(sql, r#"((a < 5) OR (a = 8))"#);
Ok(())
}

/// DataFusion can convert expressions to SQL without escaping column names using
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(Some('`'));
let unparser = Unparser::new(&dialect);
let ast = unparser.expr_to_sql(&expr)?;
let sql = format!("{}", ast);
assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#);
Ok(())
}

/// DataFusion can convert a logic plan created using the DataFrames API to read from a parquet file
/// to SQL, using column name escaping PostgreSQL style.
async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
let df = ctx
.read_parquet(
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?
.select_columns(&["id", "int_col", "double_col", "date_string_col"])?;

let ast = plan_to_sql(df.logical_plan())?;

let sql = format!("{}", ast);

assert_eq!(
sql,
r#"SELECT "?table?"."id", "?table?"."int_col", "?table?"."double_col", "?table?"."date_string_col" FROM "?table?""#
);

Ok(())
}

// DataFusion could parse a SQL into a DataFrame, adding a Filter, and converting that back to sql.
async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();

// register parquet file with the execution context
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

// create a logical plan from a SQL string and then programmatically add new filters
let df = ctx
.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain",
)
.await?
.filter(
col("id")
.gt(lit(1))
.and(col("tinyint_col").lt(col("double_col"))),
)?;

let ast = plan_to_sql(df.logical_plan())?;

let sql = format!("{}", ast);

assert_eq!(
sql,
r#"SELECT "alltypes_plain"."int_col", "alltypes_plain"."double_col", CAST("alltypes_plain"."date_string_col" AS VARCHAR) FROM "alltypes_plain" WHERE (("alltypes_plain"."id" > 1) AND ("alltypes_plain"."tinyint_col" < "alltypes_plain"."double_col"))"#
);

Ok(())
}
6 changes: 3 additions & 3 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ impl ContextProvider for MyContextProvider {
&self.options
}

fn udfs_names(&self) -> Vec<String> {
fn udf_names(&self) -> Vec<String> {
Vec::new()
}

fn udafs_names(&self) -> Vec<String> {
fn udaf_names(&self) -> Vec<String> {
Vec::new()
}

fn udwfs_names(&self) -> Vec<String> {
fn udwf_names(&self) -> Vec<String> {
Vec::new()
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
ahash = { workspace = true }
apache-avro = { version = "0.16", default-features = false, features = [
"bzip",
"snappy",
Expand All @@ -56,6 +54,7 @@ arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
half = { workspace = true }
hashbrown = { workspace = true }
libc = "0.2.140"
num_cpus = { workspace = true }
object_store = { workspace = true, optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! This module provides the bisect function, which implements binary search.
pub mod proxy;

use crate::error::{_internal_datafusion_err, _internal_err};
use crate::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait VecAllocExt {
///
/// # Example:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// # use datafusion_common::utils::proxy::VecAllocExt;
/// // use allocated to incrementally track how much memory is allocated in the vec
/// let mut allocated = 0;
/// let mut vec = Vec::new();
Expand All @@ -49,7 +49,7 @@ pub trait VecAllocExt {
/// ```
/// # Example with other allocations:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// # use datafusion_common::utils::proxy::VecAllocExt;
/// // You can use the same allocated size to track memory allocated by
/// // another source. For example
/// let mut allocated = 27;
Expand All @@ -68,7 +68,7 @@ pub trait VecAllocExt {
///
/// # Example:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// # use datafusion_common::utils::proxy::VecAllocExt;
/// let mut vec = Vec::new();
/// // Push data into the vec and the accounting will be updated to reflect
/// // memory allocation
Expand Down Expand Up @@ -119,7 +119,7 @@ pub trait RawTableAllocExt {
///
/// # Example:
/// ```
/// # use datafusion_execution::memory_pool::proxy::RawTableAllocExt;
/// # use datafusion_common::utils::proxy::RawTableAllocExt;
/// # use hashbrown::raw::RawTable;
/// let mut table = RawTable::new();
/// let mut allocated = 0;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ unicode_expressions = [
]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
ahash = { workspace = true }
apache-avro = { version = "0.16", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
Expand Down Expand Up @@ -111,7 +111,7 @@ flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
half = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
Loading

0 comments on commit e70f029

Please sign in to comment.