From 533ddcb6f43dc1b11368bdc33924afd87f439e19 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 26 Aug 2024 09:01:02 -0700 Subject: [PATCH] Provide documentation of expose APIs to enable handling of type coercion at UNION plan construction. (#12142) * chore(12105): enable union type-coercion by two approaches, using newly pub interfaces * chore(12105): update documentation to delineate btwn the interfaces involved in type coercion * chore((12105): update union() logical plan construction docs, to address type coercion --- datafusion/expr/src/logical_plan/builder.rs | 12 +++++++++- .../optimizer/src/analyzer/type_coercion.rs | 22 ++++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 559908bcfdfa..fdd07da023e0 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1331,7 +1331,17 @@ pub fn validate_unique_names<'a>( }) } -/// Union two logical plans. +/// Union two [`LogicalPlan`]s. +/// +/// Constructs the UNION plan, but does not perform type-coercion. Therefore the +/// subtree expressions will not be properly typed until the optimizer pass. +/// +/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`] +/// or alternatively, merge the union input schema using [`coerce_union_schema`] and +/// apply the expression rewrite with [`coerce_plan_expr_for_schema`]. +/// +/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union +/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { // Temporarily use the schema from the left input and later rely on the analyzer to // coerce the two schemas into a common one. diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 68ab2e13005f..315284c50839 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -56,6 +56,8 @@ use datafusion_expr::{ Projection, ScalarUDF, Union, WindowFrame, WindowFrameBound, WindowFrameUnits, }; +/// Performs type coercion by determining the schema +/// and performing the expression rewrites. #[derive(Default)] pub struct TypeCoercion {} @@ -128,16 +130,23 @@ fn analyze_internal( .map_data(|plan| plan.recompute_schema()) } -pub(crate) struct TypeCoercionRewriter<'a> { +/// Rewrite expressions to apply type coercion. +pub struct TypeCoercionRewriter<'a> { pub(crate) schema: &'a DFSchema, } impl<'a> TypeCoercionRewriter<'a> { + /// Create a new [`TypeCoercionRewriter`] with a provided schema + /// representing both the inputs and output of the [`LogicalPlan`] node. fn new(schema: &'a DFSchema) -> Self { Self { schema } } - fn coerce_plan(&mut self, plan: LogicalPlan) -> Result { + /// Coerce the [`LogicalPlan`]. + /// + /// Refer to [`TypeCoercionRewriter::coerce_join`] and [`TypeCoercionRewriter::coerce_union`] + /// for type-coercion approach. + pub fn coerce_plan(&mut self, plan: LogicalPlan) -> Result { match plan { LogicalPlan::Join(join) => self.coerce_join(join), LogicalPlan::Union(union) => Self::coerce_union(union), @@ -153,7 +162,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// /// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored /// as a list of `(t1.a, t2.b), (t1.x, t2.y)` - fn coerce_join(&mut self, mut join: Join) -> Result { + pub fn coerce_join(&mut self, mut join: Join) -> Result { join.on = join .on .into_iter() @@ -176,7 +185,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. - fn coerce_union(union_plan: Union) -> Result { + pub fn coerce_union(union_plan: Union) -> Result { let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?); let new_inputs = union_plan .inputs @@ -809,7 +818,10 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { } /// Get a common schema that is compatible with all inputs of UNION. -fn coerce_union_schema(inputs: &[Arc]) -> Result { +/// +/// This method presumes that the wildcard expansion is unneeded, or has already +/// been applied. +pub fn coerce_union_schema(inputs: &[Arc]) -> Result { let base_schema = inputs[0].schema(); let mut union_datatypes = base_schema .fields()