Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/external_parque…
Browse files Browse the repository at this point in the history
…t_index
  • Loading branch information
alamb committed May 23, 2024
2 parents 460c419 + 8f3084a commit d5542b1
Show file tree
Hide file tree
Showing 55 changed files with 2,696 additions and 1,203 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
regex = "1.8"
rstest = "0.19.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.runtime_env().register_object_store(url, store);
ctx.register_object_store(url, store);

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
let arc_s3 = Arc::new(s3);
ctx.runtime_env()
.register_object_store(&s3_url, arc_s3.clone());
ctx.register_object_store(&s3_url, arc_s3.clone());

let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ async fn main() -> Result<()> {

let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env()
.register_object_store(&s3_url, Arc::new(s3));
ctx.register_object_store(&s3_url, Arc::new(s3));

// cannot query the parquet files from this bucket because the path contains a whitespace
// and we don't support that yet
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
ctx.runtime_env().register_object_store(&u, local_fs);
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
// for the query
Expand Down
76 changes: 34 additions & 42 deletions datafusion-examples/examples/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,44 @@ use datafusion::sql::unparser::expr_to_sql;
use datafusion_sql::unparser::dialect::CustomDialect;
use datafusion_sql::unparser::{plan_to_sql, Unparser};

/// This example demonstrates the programmatic construction of
/// SQL using the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API.
/// This example demonstrates the programmatic construction of SQL strings using
/// the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API.
///
///
/// The code in this example shows how to:
/// 1. Create SQL from a variety of Expr and LogicalPlan: [`main`]`
/// 2. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql: [`simple_expr_to_sql_demo`]
/// 3. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql without escaping column names: [`simple_expr_to_sql_demo_no_escape`]
/// 4. Create a simple expression [`Exprs`] with fluent API
/// and convert to sql escaping column names a MySQL style: [`simple_expr_to_sql_demo_escape_mysql_style`]
///
/// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with
/// fluent API and convert to sql suitable for passing to another database
///
/// 2. [`simple_expr_to_sql_demo_no_escape`] Create a simple expression
/// [`Exprs`] with fluent API and convert to sql without escaping column names
/// more suitable for displaying to humans.
///
/// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple
/// expression [`Exprs`] with fluent API and convert to sql escaping column
/// names in MySQL style.
///
/// 4. [`simple_plan_to_sql_demo`]: Create a simple logical plan using the
/// DataFrames API and convert to sql string.
///
/// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the
/// DataFrames API and convert it back to a sql string.
#[tokio::main]
async fn main() -> Result<()> {
// See how to evaluate expressions
simple_expr_to_sql_demo()?;
simple_expr_to_sql_demo_no_escape()?;
simple_expr_to_sql_demo_escape_mysql_style()?;
simple_plan_to_sql_parquest_dataframe_demo().await?;
round_trip_plan_to_sql_parquest_dataframe_demo().await?;
simple_plan_to_sql_demo().await?;
round_trip_plan_to_sql_demo().await?;
Ok(())
}

/// DataFusion can convert expressions to SQL, using column name escaping
/// PostgreSQL style.
fn simple_expr_to_sql_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let ast = expr_to_sql(&expr)?;
let sql = format!("{}", ast);
assert_eq!(sql, r#"(("a" < 5) OR ("a" = 8))"#);
Ok(())
}

/// DataFusion can convert expressions to SQL without escaping column names using
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_no_escape() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(None);
let unparser = Unparser::new(&dialect);
let ast = unparser.expr_to_sql(&expr)?;
let sql = format!("{}", ast);
let sql = expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((a < 5) OR (a = 8))"#);
Ok(())
}
Expand All @@ -74,16 +70,14 @@ fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(Some('`'));
let unparser = Unparser::new(&dialect);
let ast = unparser.expr_to_sql(&expr)?;
let sql = format!("{}", ast);
let sql = unparser.expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#);
Ok(())
}

/// DataFusion can convert a logic plan created using the DataFrames API to read from a parquet file
/// to SQL, using column name escaping PostgreSQL style.
async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> {
// create local execution context
async fn simple_plan_to_sql_demo() -> Result<()> {
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
Expand All @@ -95,21 +89,20 @@ async fn simple_plan_to_sql_parquest_dataframe_demo() -> Result<()> {
.await?
.select_columns(&["id", "int_col", "double_col", "date_string_col"])?;

let ast = plan_to_sql(df.logical_plan())?;

let sql = format!("{}", ast);
// Convert the data frame to a SQL string
let sql = plan_to_sql(df.logical_plan())?.to_string();

assert_eq!(
sql,
r#"SELECT "?table?"."id", "?table?"."int_col", "?table?"."double_col", "?table?"."date_string_col" FROM "?table?""#
r#"SELECT "?table?".id, "?table?".int_col, "?table?".double_col, "?table?".date_string_col FROM "?table?""#
);

Ok(())
}

// DataFusion could parse a SQL into a DataFrame, adding a Filter, and converting that back to sql.
async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> {
// create local execution context
/// DataFusion can also be used to parse SQL, programmatically modify the query
/// (in this case adding a filter) and then and converting back to SQL.
async fn round_trip_plan_to_sql_demo() -> Result<()> {
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
Expand All @@ -124,24 +117,23 @@ async fn round_trip_plan_to_sql_parquest_dataframe_demo() -> Result<()> {

// create a logical plan from a SQL string and then programmatically add new filters
let df = ctx
// Use SQL to read some data from the parquet file
.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain",
)
.await?
// Add id > 1 and tinyint_col < double_col filter
.filter(
col("id")
.gt(lit(1))
.and(col("tinyint_col").lt(col("double_col"))),
)?;

let ast = plan_to_sql(df.logical_plan())?;

let sql = format!("{}", ast);

let sql = plan_to_sql(df.logical_plan())?.to_string();
assert_eq!(
sql,
r#"SELECT "alltypes_plain"."int_col", "alltypes_plain"."double_col", CAST("alltypes_plain"."date_string_col" AS VARCHAR) FROM "alltypes_plain" WHERE (("alltypes_plain"."id" > 1) AND ("alltypes_plain"."tinyint_col" < "alltypes_plain"."double_col"))"#
r#"SELECT alltypes_plain.int_col, alltypes_plain.double_col, CAST(alltypes_plain.date_string_col AS VARCHAR) FROM alltypes_plain WHERE ((alltypes_plain.id > 1) AND (alltypes_plain.tinyint_col < alltypes_plain.double_col))"#
);

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/query-http-csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ async fn main() -> Result<()> {
.with_url(base_url.clone())
.build()
.unwrap();
ctx.runtime_env()
.register_object_store(&base_url, Arc::new(http_store));
ctx.register_object_store(&base_url, Arc::new(http_store));

// register csv file with the execution context
ctx.register_csv(
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.4.3"
regex = "1.5.4"
regex = { workspace = true }
rstest = { workspace = true }
rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
serde_json = { workspace = true }
Expand Down Expand Up @@ -209,3 +209,7 @@ name = "sort"
[[bench]]
harness = false
name = "topk_aggregate"

[[bench]]
harness = false
name = "parquet_statistic"
Loading

0 comments on commit d5542b1

Please sign in to comment.