From 27e8ab3bcf8a5e97c121d6a40fc4d118a0ed44fa Mon Sep 17 00:00:00 2001 From: Nuttiiya Seekhao Date: Wed, 20 Sep 2023 14:36:32 -0700 Subject: [PATCH 1/5] Encode all join conditions in a single expression field --- .../substrait/src/logical_plan/consumer.rs | 64 ++++++++++++++----- .../substrait/src/logical_plan/producer.rs | 31 +++++++-- 2 files changed, 72 insertions(+), 23 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 32b8f8ea547f..32224ac553a8 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -129,6 +129,17 @@ fn scalar_function_type_from_str(name: &str) -> Result { } } +#[inline(always)] +fn make_mixed_join_condition( + join_filter: Option, + predicate: &Expr, +) -> Option { + match &join_filter { + Some(filter) => Some(filter.clone().and(predicate.clone())), + None => Some(predicate.clone()), + } +} + /// Convert Substrait Plan to DataFusion DataFrame pub async fn from_substrait_plan( ctx: &mut SessionContext, @@ -342,7 +353,7 @@ pub async fn from_substrait_rel( // certain join types such as semi and anti joins let in_join_schema = left.schema().join(right.schema())?; // Parse post join filter if exists - let join_filter = match &join.post_join_filter { + let mut join_filter = match &join.post_join_filter { Some(filter) => { let parsed_filter = from_substrait_rex(filter, &in_join_schema, extensions).await?; @@ -351,41 +362,62 @@ pub async fn from_substrait_rel( None => None, }; // If join expression exists, parse the `on` condition expression, build join and return - // Otherwise, build join with koin filter, without join keys + // Otherwise, build join with only the filter, without join keys match &join.expression.as_ref() { Some(expr) => { let on = from_substrait_rex(expr, &in_join_schema, extensions).await?; let predicates = split_conjunction(&on); // TODO: collect only one null_eq_null - let join_exprs: Vec<(Column, Column, bool)> = predicates - .iter() - .map(|p| match p { + // The predicates can contain both equal and non-equal ops. + // Since as of datafusion 31.0.0, the equal and non equal join conditions are in separate fields, + // we extract each part as follows: + // - If an equal op is encountered, add the left column, right column and is_null_equal_nulls to `join_on` vector + // - Otherwise we add the expression to join_filters (use conjunction if filter already exists) + let mut join_on: Vec<(Column, Column, bool)> = vec![]; + for p in predicates { + match p { Expr::BinaryExpr(BinaryExpr { left, op, right }) => { match (left.as_ref(), right.as_ref()) { (Expr::Column(l), Expr::Column(r)) => match op { - Operator::Eq => Ok((l.clone(), r.clone(), false)), + Operator::Eq => { + join_on.push((l.clone(), r.clone(), false)) + } Operator::IsNotDistinctFrom => { - Ok((l.clone(), r.clone(), true)) + join_on.push((l.clone(), r.clone(), true)) + } + _ => { + // If the operator is not a form of an equal operator, add this expression to filter + join_filter = + make_mixed_join_condition(join_filter, p); } - _ => plan_err!("invalid join condition op"), }, - _ => plan_err!("invalid join condition expression"), + _ => { + // If this is not a `col op col` expression, then `and` the expression to filter + join_filter = + make_mixed_join_condition(join_filter, p); + } } } - _ => plan_err!( - "Non-binary expression is not supported in join condition" - ), - }) - .collect::>>()?; + _ => { + // If this is not a binary expression, then `and` the expression to filter + join_filter = make_mixed_join_condition(join_filter, p); + } + } + } + let (left_cols, right_cols, null_eq_nulls): (Vec<_>, Vec<_>, Vec<_>) = - itertools::multiunzip(join_exprs); + itertools::multiunzip(join_on); left.join_detailed( right.build()?, join_type, (left_cols, right_cols), join_filter, - null_eq_nulls[0], + if null_eq_nulls.is_empty() { + false // if no equal condition, default null_eq_nulls to false + } else { + null_eq_nulls[0] + }, )? .build() } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index e17b022f3b53..eb855a9c14c6 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -277,14 +277,15 @@ pub fn to_substrait_rel( // parse filter if exists let in_join_schema = join.left.schema().join(join.right.schema())?; let join_filter = match &join.filter { - Some(filter) => Some(Box::new(to_substrait_rex( + Some(filter) => Some(to_substrait_rex( filter, &Arc::new(in_join_schema), 0, extension_info, - )?)), + )?), None => None, }; + // map the left and right columns to binary expressions in the form `l = r` // build a single expression for the ON condition, such as `l.a = r.a AND l.b = r.b` let eq_op = if join.null_equals_null { @@ -292,15 +293,31 @@ pub fn to_substrait_rel( } else { Operator::Eq }; - - let join_expr = to_substrait_join_expr( + let join_on = to_substrait_join_expr( &join.on, eq_op, join.left.schema(), join.right.schema(), extension_info, - )? - .map(Box::new); + )?; + + // create conjunction between `join_on` and `join_filter` to embed all join conditions, + // whether equal or non-equal in a single expression + let join_expr = match &join_on { + Some(on_expr) => match &join_filter { + Some(filter) => Some(Box::new(make_binary_op_scalar_func( + on_expr, + filter, + Operator::And, + extension_info, + ))), + None => join_on.map(Box::new), // the join expression will only contain `join_on` if filter doesn't exist + }, + None => match &join_filter { + Some(_) => join_filter.map(Box::new), // the join expression will only contain `join_file` if the `on` condition doesn't exist + None => None, + }, + }; Ok(Box::new(Rel { rel_type: Some(RelType::Join(Box::new(JoinRel { @@ -309,7 +326,7 @@ pub fn to_substrait_rel( right: Some(right), r#type: join_type as i32, expression: join_expr, - post_join_filter: join_filter, + post_join_filter: None, advanced_extension: None, }))), })) From 301414ce1a7931383e3e7cbed76a5669595ef5f6 Mon Sep 17 00:00:00 2001 From: Nuttiiya Seekhao Date: Thu, 21 Sep 2023 13:00:34 -0700 Subject: [PATCH 2/5] Removed all references to post_join_filter --- .../substrait/src/logical_plan/consumer.rs | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 32224ac553a8..91d843304e52 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -342,6 +342,12 @@ pub async fn from_substrait_rel( } } Some(RelType::Join(join)) => { + if join.post_join_filter.is_some() { + return not_impl_err!( + "JoinRel with post_join_filter is not yet supported" + ); + } + let left = LogicalPlanBuilder::from( from_substrait_rel(ctx, join.left.as_ref().unwrap(), extensions).await?, ); @@ -352,15 +358,7 @@ pub async fn from_substrait_rel( // The join condition expression needs full input schema and not the output schema from join since we lose columns from // certain join types such as semi and anti joins let in_join_schema = left.schema().join(right.schema())?; - // Parse post join filter if exists - let mut join_filter = match &join.post_join_filter { - Some(filter) => { - let parsed_filter = - from_substrait_rex(filter, &in_join_schema, extensions).await?; - Some(parsed_filter.as_ref().clone()) - } - None => None, - }; + let mut join_filter: Option = None; // If join expression exists, parse the `on` condition expression, build join and return // Otherwise, build join with only the filter, without join keys match &join.expression.as_ref() { @@ -373,7 +371,7 @@ pub async fn from_substrait_rel( // Since as of datafusion 31.0.0, the equal and non equal join conditions are in separate fields, // we extract each part as follows: // - If an equal op is encountered, add the left column, right column and is_null_equal_nulls to `join_on` vector - // - Otherwise we add the expression to join_filters (use conjunction if filter already exists) + // - Otherwise we add the expression to join_filter (use conjunction if filter already exists) let mut join_on: Vec<(Column, Column, bool)> = vec![]; for p in predicates { match p { @@ -421,17 +419,7 @@ pub async fn from_substrait_rel( )? .build() } - None => match &join_filter { - Some(_) => left - .join( - right.build()?, - join_type, - (Vec::::new(), Vec::::new()), - join_filter, - )? - .build(), - None => plan_err!("Join without join keys require a valid filter"), - }, + None => plan_err!("JoinRel without join condition is not allowed"), } } Some(RelType::Read(read)) => match &read.as_ref().read_type { From cf651fda69c134b47702a5ee2f9a429ae99b656c Mon Sep 17 00:00:00 2001 From: Nuttiiya Seekhao Date: Thu, 5 Oct 2023 16:17:44 -0700 Subject: [PATCH 3/5] Simplify from_substrait_rel() --- .../substrait/src/logical_plan/consumer.rs | 113 +++++++++--------- 1 file changed, 56 insertions(+), 57 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 91d843304e52..e241ec201e2d 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -18,13 +18,14 @@ use async_recursion::async_recursion; use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use datafusion::common::{not_impl_err, DFField, DFSchema, DFSchemaRef}; +use datafusion::logical_expr::utils::{can_hash, find_valid_equijoin_key_pair}; use datafusion::logical_expr::{ aggregate_function, window_function::find_df_window_func, BinaryExpr, BuiltinScalarFunction, Case, Expr, LogicalPlan, Operator, }; use datafusion::logical_expr::{ - expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, WindowFrameBound, - WindowFrameUnits, + expr, Cast, ExprSchemable, Extension, GroupingSet, Like, LogicalPlanBuilder, + WindowFrameBound, WindowFrameUnits, }; use datafusion::prelude::JoinType; use datafusion::sql::TableReference; @@ -129,15 +130,49 @@ fn scalar_function_type_from_str(name: &str) -> Result { } } -#[inline(always)] -fn make_mixed_join_condition( - join_filter: Option, - predicate: &Expr, -) -> Option { - match &join_filter { - Some(filter) => Some(filter.clone().and(predicate.clone())), - None => Some(predicate.clone()), +fn split_eq_and_noneq_join_predicate_with_nulls_equality( + filter: &Expr, +) -> (Vec<(Column, Column)>, bool, Option) { + let exprs = split_conjunction(filter); + + let mut accum_join_keys: Vec<(Column, Column)> = vec![]; + let mut accum_filters: Vec = vec![]; + let mut nulls_equal_nulls = false; + + for expr in exprs { + match expr { + Expr::BinaryExpr(binary_expr) => match binary_expr { + x @ (BinaryExpr { + left, + op: Operator::Eq, + right, + } + | BinaryExpr { + left, + op: Operator::IsNotDistinctFrom, + right, + }) => { + nulls_equal_nulls = match x.op { + Operator::Eq => false, + Operator::IsNotDistinctFrom => true, + _ => unreachable!(), + }; + + match (left.as_ref(), right.as_ref()) { + (Expr::Column(l), Expr::Column(r)) => { + accum_join_keys.push((l.clone(), r.clone())); + } + _ => accum_filters.push(expr.clone()), + } + } + _ => accum_filters.push(expr.clone()), + }, + _ => accum_filters.push(expr.clone()), + } } + + let join_filter = accum_filters.into_iter().reduce(Expr::and); + (accum_join_keys, nulls_equal_nulls, join_filter) } /// Convert Substrait Plan to DataFusion DataFrame @@ -348,7 +383,7 @@ pub async fn from_substrait_rel( ); } - let left = LogicalPlanBuilder::from( + let left: LogicalPlanBuilder = LogicalPlanBuilder::from( from_substrait_rel(ctx, join.left.as_ref().unwrap(), extensions).await?, ); let right = LogicalPlanBuilder::from( @@ -358,64 +393,28 @@ pub async fn from_substrait_rel( // The join condition expression needs full input schema and not the output schema from join since we lose columns from // certain join types such as semi and anti joins let in_join_schema = left.schema().join(right.schema())?; - let mut join_filter: Option = None; + // If join expression exists, parse the `on` condition expression, build join and return // Otherwise, build join with only the filter, without join keys match &join.expression.as_ref() { Some(expr) => { let on = from_substrait_rex(expr, &in_join_schema, extensions).await?; - let predicates = split_conjunction(&on); - // TODO: collect only one null_eq_null - // The predicates can contain both equal and non-equal ops. - // Since as of datafusion 31.0.0, the equal and non equal join conditions are in separate fields, - // we extract each part as follows: - // - If an equal op is encountered, add the left column, right column and is_null_equal_nulls to `join_on` vector + // The join expression can contain both equal and non-equal ops. + // As of datafusion 31.0.0, the equal and non equal join conditions are in separate fields. + // So we extract each part as follows: + // - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector // - Otherwise we add the expression to join_filter (use conjunction if filter already exists) - let mut join_on: Vec<(Column, Column, bool)> = vec![]; - for p in predicates { - match p { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - match (left.as_ref(), right.as_ref()) { - (Expr::Column(l), Expr::Column(r)) => match op { - Operator::Eq => { - join_on.push((l.clone(), r.clone(), false)) - } - Operator::IsNotDistinctFrom => { - join_on.push((l.clone(), r.clone(), true)) - } - _ => { - // If the operator is not a form of an equal operator, add this expression to filter - join_filter = - make_mixed_join_condition(join_filter, p); - } - }, - _ => { - // If this is not a `col op col` expression, then `and` the expression to filter - join_filter = - make_mixed_join_condition(join_filter, p); - } - } - } - _ => { - // If this is not a binary expression, then `and` the expression to filter - join_filter = make_mixed_join_condition(join_filter, p); - } - } - } - - let (left_cols, right_cols, null_eq_nulls): (Vec<_>, Vec<_>, Vec<_>) = - itertools::multiunzip(join_on); + let (join_ons, nulls_equal_nulls, join_filter) = + split_eq_and_noneq_join_predicate_with_nulls_equality(&on); + let (left_cols, right_cols): (Vec<_>, Vec<_>) = + itertools::multiunzip(join_ons); left.join_detailed( right.build()?, join_type, (left_cols, right_cols), join_filter, - if null_eq_nulls.is_empty() { - false // if no equal condition, default null_eq_nulls to false - } else { - null_eq_nulls[0] - }, + nulls_equal_nulls, )? .build() } From 590e7a9f9f7a77865c9dce289b7a1fde2d6cb5fc Mon Sep 17 00:00:00 2001 From: Nuttiiya Seekhao Date: Fri, 6 Oct 2023 11:22:07 -0700 Subject: [PATCH 4/5] Clippy fix --- datafusion/substrait/src/logical_plan/consumer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index e241ec201e2d..c8cd3fd53211 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -18,13 +18,13 @@ use async_recursion::async_recursion; use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; use datafusion::common::{not_impl_err, DFField, DFSchema, DFSchemaRef}; -use datafusion::logical_expr::utils::{can_hash, find_valid_equijoin_key_pair}; + use datafusion::logical_expr::{ aggregate_function, window_function::find_df_window_func, BinaryExpr, BuiltinScalarFunction, Case, Expr, LogicalPlan, Operator, }; use datafusion::logical_expr::{ - expr, Cast, ExprSchemable, Extension, GroupingSet, Like, LogicalPlanBuilder, + expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, WindowFrameBound, WindowFrameUnits, }; use datafusion::prelude::JoinType; From b48cc0f8af97d85bf8081cf762e6f6991e710f80 Mon Sep 17 00:00:00 2001 From: Nuttiiya Seekhao Date: Fri, 6 Oct 2023 14:45:01 -0700 Subject: [PATCH 5/5] Added test to ensure that Substrait plans produced from DF do not contain a post_join_filter --- .../substrait/src/logical_plan/consumer.rs | 4 +- .../substrait/src/logical_plan/producer.rs | 4 +- .../tests/cases/roundtrip_logical_plan.rs | 118 +++++++++++++++++- 3 files changed, 118 insertions(+), 8 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index c8cd3fd53211..c4545da8d563 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -24,8 +24,8 @@ use datafusion::logical_expr::{ BuiltinScalarFunction, Case, Expr, LogicalPlan, Operator, }; use datafusion::logical_expr::{ - expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, - WindowFrameBound, WindowFrameUnits, + expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, WindowFrameBound, + WindowFrameUnits, }; use datafusion::prelude::JoinType; use datafusion::sql::TableReference; diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index eb855a9c14c6..fcd30913318a 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -314,7 +314,7 @@ pub fn to_substrait_rel( None => join_on.map(Box::new), // the join expression will only contain `join_on` if filter doesn't exist }, None => match &join_filter { - Some(_) => join_filter.map(Box::new), // the join expression will only contain `join_file` if the `on` condition doesn't exist + Some(_) => join_filter.map(Box::new), // the join expression will only contain `join_filter` if the `on` condition doesn't exist None => None, }, }; @@ -325,7 +325,7 @@ pub fn to_substrait_rel( left: Some(left), right: Some(right), r#type: join_type as i32, - expression: join_expr, + expression: join_expr.clone(), post_join_filter: None, advanced_extension: None, }))), diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f4d74ae42681..807cb1edec16 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -23,15 +23,18 @@ use std::hash::Hash; use std::sync::Arc; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; -use datafusion::common::{DFSchema, DFSchemaRef}; -use datafusion::error::Result; +use datafusion::common::{not_impl_err, plan_err, DFSchema, DFSchemaRef}; +use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; use datafusion::execution::registry::SerializerRegistry; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_expr::{Extension, LogicalPlan, UserDefinedLogicalNode}; use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST; use datafusion::prelude::*; + use substrait::proto::extensions::simple_extension_declaration::MappingType; +use substrait::proto::rel::RelType; +use substrait::proto::{plan_rel, Plan, Rel}; struct MockSerializerRegistry; @@ -378,12 +381,15 @@ async fn roundtrip_inner_join() -> Result<()> { #[tokio::test] async fn roundtrip_non_equi_inner_join() -> Result<()> { - roundtrip("SELECT data.a FROM data JOIN data2 ON data.a <> data2.a").await + roundtrip_verify_post_join_filter( + "SELECT data.a FROM data JOIN data2 ON data.a <> data2.a", + ) + .await } #[tokio::test] async fn roundtrip_non_equi_join() -> Result<()> { - roundtrip( + roundtrip_verify_post_join_filter( "SELECT data.a FROM data, data2 WHERE data.a = data2.a AND data.e > data2.a", ) .await @@ -615,6 +621,91 @@ async fn extension_logical_plan() -> Result<()> { Ok(()) } +fn check_post_join_filters(rel: &Rel) -> Result<()> { + // search for target_rel and field value in proto + match &rel.rel_type { + Some(RelType::Join(join)) => { + // check if join filter is None + if join.post_join_filter.is_some() { + plan_err!( + "DataFusion generated Susbtrait plan cannot have post_join_filter in JoinRel" + ) + } else { + // recursively check JoinRels + match check_post_join_filters(join.left.as_ref().unwrap().as_ref()) { + Err(e) => Err(e), + Ok(_) => { + check_post_join_filters(join.right.as_ref().unwrap().as_ref()) + } + } + } + } + Some(RelType::Project(p)) => { + check_post_join_filters(p.input.as_ref().unwrap().as_ref()) + } + Some(RelType::Filter(filter)) => { + check_post_join_filters(filter.input.as_ref().unwrap().as_ref()) + } + Some(RelType::Fetch(fetch)) => { + check_post_join_filters(fetch.input.as_ref().unwrap().as_ref()) + } + Some(RelType::Sort(sort)) => { + check_post_join_filters(sort.input.as_ref().unwrap().as_ref()) + } + Some(RelType::Aggregate(agg)) => { + check_post_join_filters(agg.input.as_ref().unwrap().as_ref()) + } + Some(RelType::Set(set)) => { + for input in &set.inputs { + match check_post_join_filters(input) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + Ok(()) + } + Some(RelType::ExtensionSingle(ext)) => { + check_post_join_filters(ext.input.as_ref().unwrap().as_ref()) + } + Some(RelType::ExtensionMulti(ext)) => { + for input in &ext.inputs { + match check_post_join_filters(input) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + Ok(()) + } + Some(RelType::ExtensionLeaf(_)) | Some(RelType::Read(_)) => Ok(()), + _ => not_impl_err!( + "Unsupported RelType: {:?} in post join filter check", + rel.rel_type + ), + } +} + +async fn verify_post_join_filter_value(proto: Box) -> Result<()> { + for relation in &proto.relations { + match relation.rel_type.as_ref() { + Some(rt) => match rt { + plan_rel::RelType::Rel(rel) => match check_post_join_filters(rel) { + Err(e) => return Err(e), + Ok(_) => continue, + }, + plan_rel::RelType::Root(root) => { + match check_post_join_filters(root.input.as_ref().unwrap()) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + }, + None => return plan_err!("Cannot parse plan relation: None"), + } + } + + Ok(()) +} + async fn assert_expected_plan(sql: &str, expected_plan_str: &str) -> Result<()> { let mut ctx = create_context().await?; let df = ctx.sql(sql).await?; @@ -683,6 +774,25 @@ async fn roundtrip(sql: &str) -> Result<()> { Ok(()) } +async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { + let mut ctx = create_context().await?; + let df = ctx.sql(sql).await?; + let plan = df.into_optimized_plan()?; + let proto = to_substrait_plan(&plan, &ctx)?; + let plan2 = from_substrait_plan(&mut ctx, &proto).await?; + let plan2 = ctx.state().optimize(&plan2)?; + + println!("{plan:#?}"); + println!("{plan2:#?}"); + + let plan1str = format!("{plan:?}"); + let plan2str = format!("{plan2:?}"); + assert_eq!(plan1str, plan2str); + + // verify that the join filters are None + verify_post_join_filter_value(proto).await +} + async fn roundtrip_all_types(sql: &str) -> Result<()> { let mut ctx = create_all_type_context().await?; let df = ctx.sql(sql).await?;