From bd59fbbf83bbc685b2b13ad2c99b26821c9acf1d Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Fri, 11 Oct 2024 21:42:32 +0800 Subject: [PATCH 1/3] unparse Sort with push down limit --- datafusion/sql/src/unparser/plan.rs | 19 +++++++++++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 20 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index c4fcbb2d6458..4730349a2742 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -21,10 +21,7 @@ use datafusion_common::{ tree_node::{TransformedResult, TreeNode}, Column, DataFusionError, Result, TableReference, }; -use datafusion_expr::{ - expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, - LogicalPlanBuilder, Projection, SortExpr, -}; +use datafusion_expr::{expr::Alias, Distinct, Expr, JoinConstraint, JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Projection, Sort, SortExpr}; use sqlparser::ast::{self, Ident, SetExpr}; use std::sync::Arc; @@ -352,6 +349,20 @@ impl Unparser<'_> { ) } LogicalPlan::Sort(sort) => { + if let Some(fetch) = sort.fetch { + let new_sort = Arc::new(LogicalPlan::Sort(Sort { + expr: sort.expr.clone(), + input: Arc::clone(&sort.input), + fetch: None, + })); + let limit = Arc::new(LogicalPlan::Limit(Limit { skip: 0, fetch: Some(fetch), input: new_sort })); + return self.select_to_sql_recursively( + &limit, + query, + select, + relation, + ); + } // Sort can be top-level plan for derived table if select.already_projected() { return self.derive(plan, relation); diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 903d4e28520b..aff9f99c8cd3 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -841,6 +841,26 @@ fn test_table_scan_pushdown() -> Result<()> { Ok(()) } +#[test] +fn test_sort_with_push_down_fetch() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("age", DataType::Utf8, false), + ]); + + let plan = table_scan(Some("t1"), &schema, None)? + .project(vec![col("id"), col("age")])? + .sort_with_limit(vec![col("age").sort(true, true)], Some(10))? + .build()?; + + let sql = plan_to_sql(&plan)?; + assert_eq!( + format!("{}", sql), + "SELECT t1.id, t1.age FROM t1 ORDER BY t1.age ASC NULLS FIRST LIMIT 10" + ); + Ok(()) +} + #[test] fn test_interval_lhs_eq() { sql_round_trip( From 69a171a49ab8cf9f83bbc08b66f2873a9d62afb7 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Fri, 11 Oct 2024 22:01:01 +0800 Subject: [PATCH 2/3] cargo fmt --- datafusion/sql/src/unparser/plan.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 4730349a2742..418e983b8030 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -21,7 +21,10 @@ use datafusion_common::{ tree_node::{TransformedResult, TreeNode}, Column, DataFusionError, Result, TableReference, }; -use datafusion_expr::{expr::Alias, Distinct, Expr, JoinConstraint, JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Projection, Sort, SortExpr}; +use datafusion_expr::{ + expr::Alias, Distinct, Expr, JoinConstraint, JoinType, Limit, LogicalPlan, + LogicalPlanBuilder, Projection, Sort, SortExpr, +}; use sqlparser::ast::{self, Ident, SetExpr}; use std::sync::Arc; @@ -355,13 +358,13 @@ impl Unparser<'_> { input: Arc::clone(&sort.input), fetch: None, })); - let limit = Arc::new(LogicalPlan::Limit(Limit { skip: 0, fetch: Some(fetch), input: new_sort })); - return self.select_to_sql_recursively( - &limit, - query, - select, - relation, - ); + let limit = Arc::new(LogicalPlan::Limit(Limit { + skip: 0, + fetch: Some(fetch), + input: new_sort, + })); + return self + .select_to_sql_recursively(&limit, query, select, relation); } // Sort can be top-level plan for derived table if select.already_projected() { From d98169fc33b25a3f37b08e742247f056c60fe0fb Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sat, 12 Oct 2024 13:44:45 +0800 Subject: [PATCH 3/3] set query limit directly --- datafusion/sql/src/unparser/plan.rs | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 418e983b8030..d150f0e532c6 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -22,8 +22,8 @@ use datafusion_common::{ Column, DataFusionError, Result, TableReference, }; use datafusion_expr::{ - expr::Alias, Distinct, Expr, JoinConstraint, JoinType, Limit, LogicalPlan, - LogicalPlanBuilder, Projection, Sort, SortExpr, + expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, + LogicalPlanBuilder, Projection, SortExpr, }; use sqlparser::ast::{self, Ident, SetExpr}; use std::sync::Arc; @@ -352,25 +352,17 @@ impl Unparser<'_> { ) } LogicalPlan::Sort(sort) => { - if let Some(fetch) = sort.fetch { - let new_sort = Arc::new(LogicalPlan::Sort(Sort { - expr: sort.expr.clone(), - input: Arc::clone(&sort.input), - fetch: None, - })); - let limit = Arc::new(LogicalPlan::Limit(Limit { - skip: 0, - fetch: Some(fetch), - input: new_sort, - })); - return self - .select_to_sql_recursively(&limit, query, select, relation); - } // Sort can be top-level plan for derived table if select.already_projected() { return self.derive(plan, relation); } if let Some(query_ref) = query { + if let Some(fetch) = sort.fetch { + query_ref.limit(Some(ast::Expr::Value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } query_ref.order_by(self.sorts_to_sql(sort.expr.clone())?); } else { return internal_err!(