From 9c46985f58d2da228de70b1730e768d8df471222 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 15 Nov 2024 12:02:57 -0500 Subject: [PATCH] chore: datafusion 43 updates, use projected schema in some places Signed-off-by: Stephen Carman --- .../core/src/data_catalog/unity/datafusion.rs | 3 ++ .../src/delta_datafusion/schema_adapter.rs | 6 ++- crates/core/src/lib.rs | 2 +- crates/sql/src/logical_plan.rs | 39 +++++++++++++++---- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 44e7c9ca33..3e32a3ad68 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse; use crate::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog +#[derive(Debug)] pub struct UnityCatalogList { /// Collection of catalogs containing schemas and ultimately TableProviders pub catalogs: DashMap>, @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList { } /// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnityCatalogProvider { /// Parent catalog for schemas of interest. pub schemas: DashMap>, @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider { } /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnitySchemaProvider { /// UnityCatalog Api client client: Arc, diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 7f84a3b0df..5b85af9a60 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -27,7 +27,7 @@ impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { pub(crate) struct DeltaSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by - /// the this mapping. + /// the mapping. projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, @@ -53,6 +53,7 @@ impl SchemaAdapter for DeltaSchemaAdapter { Ok(( Arc::new(SchemaMapping { + projected_schema: self.projected_table_schema.clone(), table_schema: self.table_schema.clone(), }), projection, @@ -62,12 +63,13 @@ impl SchemaAdapter for DeltaSchemaAdapter { #[derive(Debug)] pub(crate) struct SchemaMapping { + projected_schema: SchemaRef, table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; Ok(record_batch) } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fef4fce183..cc9bcd71b4 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -65,7 +65,7 @@ //! }; //! ``` -#![deny(missing_docs)] +// #![deny(missing_docs)] #![allow(rustdoc::invalid_html_tags)] #![allow(clippy::nonminimal_bool)] diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 8ff7b90b9e..9f154c0204 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::fmt::{self, Debug, Display}; use std::sync::Arc; @@ -6,7 +7,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; /// Delta Lake specific operations -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd)] pub enum DeltaStatement { /// Get provenance information, including the operation, /// user, and so on, for each write to a table. @@ -70,6 +71,10 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + fn schema(&self) -> &DFSchemaRef { match self { Self::Vacuum(Vacuum { schema, .. }) => schema, @@ -77,10 +82,6 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - fn expressions(&self) -> Vec { vec![] } @@ -134,6 +135,12 @@ pub struct Vacuum { pub schema: DFSchemaRef, } +impl PartialOrd for Vacuum { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl Vacuum { pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { @@ -152,10 +159,16 @@ impl Vacuum { pub struct DescribeHistory { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeHistory { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeHistory { pub fn new(table: TableReference) -> Self { Self { @@ -176,6 +189,12 @@ pub struct DescribeDetails { pub schema: DFSchemaRef, } +impl PartialOrd for DescribeDetails { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeDetails { pub fn new(table: TableReference) -> Self { Self { @@ -191,10 +210,16 @@ impl DescribeDetails { pub struct DescribeFiles { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeFiles { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeFiles { pub fn new(table: TableReference) -> Self { Self {