From 0ad44a07f2c5bae8d56009f1a67d89f997b51fac Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Wed, 26 Oct 2022 14:43:54 -0600 Subject: [PATCH 1/5] Revert async changes but keep deltalake working --- datafusion/core/src/datasource/datasource.rs | 3 ++ datafusion/core/src/test_util.rs | 6 +++ datafusion/proto/Cargo.toml | 1 - datafusion/proto/README.md | 2 +- datafusion/proto/examples/plan_serde.rs | 2 +- datafusion/proto/src/bytes/mod.rs | 20 +++++----- datafusion/proto/src/lib.rs | 22 +++++------ datafusion/proto/src/logical_plan.rs | 39 ++++++++------------ 8 files changed, 45 insertions(+), 50 deletions(-) diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 38e3193c4935..3f07201851bf 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -87,4 +87,7 @@ pub trait TableProvider: Sync + Send { pub trait TableProviderFactory: Sync + Send { /// Create a TableProvider with the given url async fn create(&self, url: &str) -> Result>; + + /// Create a TableProvider with the given url and schema + fn with_schema(&self, url: &str, schema: SchemaRef) -> Result>; } diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index a080d4bab344..8d5c1fafd46d 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -290,6 +290,12 @@ impl TableProviderFactory for TestTableFactory { url: url.to_string(), })) } + + fn with_schema(&self, url: &str, _schema: SchemaRef) -> datafusion_common::Result> { + Ok(Arc::new(TestTableProvider { + url: url.to_string(), + })) + } } /// TableProvider for testing purposes diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index ef5d8211248a..ff2af33b22d8 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -41,7 +41,6 @@ json = ["pbjson", "pbjson-build", "serde", "serde_json"] [dependencies] arrow = "25.0.0" -async-trait = "0.1.41" datafusion = { path = "../core", version = "13.0.0" } datafusion-common = { path = "../common", version = "13.0.0" } datafusion-expr = { path = "../expr", version = "13.0.0" } diff --git a/datafusion/proto/README.md b/datafusion/proto/README.md index 8c8962e506a6..a3878447e042 100644 --- a/datafusion/proto/README.md +++ b/datafusion/proto/README.md @@ -63,7 +63,7 @@ async fn main() -> Result<()> { ?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/examples/plan_serde.rs b/datafusion/proto/examples/plan_serde.rs index eed372476fff..d98d88b2a46a 100644 --- a/datafusion/proto/examples/plan_serde.rs +++ b/datafusion/proto/examples/plan_serde.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 5695bf50686a..c463dff53f8f 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -19,7 +19,6 @@ use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; use crate::{from_proto::parse_expr, protobuf}; use arrow::datatypes::SchemaRef; -use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan}; @@ -136,27 +135,27 @@ pub fn logical_plan_to_bytes_with_extension_codec( /// Deserialize a LogicalPlan from json #[cfg(feature = "json")] -pub async fn logical_plan_from_json( +pub fn logical_plan_from_json( json: &str, ctx: &SessionContext, ) -> Result { let back: protobuf::LogicalPlanNode = serde_json::from_str(json) .map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?; let extension_codec = DefaultExtensionCodec {}; - back.try_into_logical_plan(ctx, &extension_codec).await + back.try_into_logical_plan(ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes( +pub fn logical_plan_from_bytes( bytes: &[u8], ctx: &SessionContext, ) -> Result { let extension_codec = DefaultExtensionCodec {}; - logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await + logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes_with_extension_codec( +pub fn logical_plan_from_bytes_with_extension_codec( bytes: &[u8], ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -164,13 +163,12 @@ pub async fn logical_plan_from_bytes_with_extension_codec( let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| { DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e)) })?; - protobuf.try_into_logical_plan(ctx, extension_codec).await + protobuf.try_into_logical_plan(ctx, extension_codec) } #[derive(Debug)] struct DefaultExtensionCodec {} -#[async_trait] impl LogicalExtensionCodec for DefaultExtensionCodec { fn try_decode( &self, @@ -189,7 +187,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -243,12 +241,12 @@ mod test { assert_eq!(actual, expected); } - #[tokio::test] + #[test] #[cfg(feature = "json")] async fn json_to_plan() { let input = r#"{"emptyRelation":{}}"#.to_string(); let ctx = SessionContext::new(); - let actual = logical_plan_from_json(&input, &ctx).await.unwrap(); + let actual = logical_plan_from_json(&input, &ctx).unwrap(); let result = matches!(actual, LogicalPlan::EmptyRelation(_)); assert!(result, "Should parse empty relation"); } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 1552542a1071..a5ba791236dd 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -59,7 +59,6 @@ mod roundtrip_tests { TimeUnit, UnionMode, }, }; - use async_trait::async_trait; use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -130,8 +129,7 @@ mod roundtrip_tests { let bytes = logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec) - .await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?; assert_eq!( format!("{:?}", topk_plan), format!("{:?}", logical_round_trip) @@ -149,7 +147,6 @@ mod roundtrip_tests { #[derive(Debug)] pub struct TestTableProviderCodec {} - #[async_trait] impl LogicalExtensionCodec for TestTableProviderCodec { fn try_decode( &self, @@ -172,10 +169,10 @@ mod roundtrip_tests { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], - _schema: SchemaRef, + schema: SchemaRef, ctx: &SessionContext, ) -> Result, DataFusionError> { let msg = TestTableProto::decode(buf).map_err(|_| { @@ -189,7 +186,7 @@ mod roundtrip_tests { .get("testtable") .expect("Unable to find testtable factory") .clone(); - let provider = (*factory).create(msg.url.as_str()).await?; + let provider = (*factory).with_schema(msg.url.as_str(), schema)?; Ok(provider) } @@ -229,7 +226,7 @@ mod roundtrip_tests { let scan = ctx.table("t")?.to_logical_plan()?; let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?; assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -257,7 +254,7 @@ mod roundtrip_tests { println!("{:?}", plan); let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) @@ -270,7 +267,7 @@ mod roundtrip_tests { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -284,7 +281,7 @@ mod roundtrip_tests { .await?; let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -367,7 +364,6 @@ mod roundtrip_tests { #[derive(Debug)] pub struct TopKExtensionCodec {} - #[async_trait] impl LogicalExtensionCodec for TopKExtensionCodec { fn try_decode( &self, @@ -431,7 +427,7 @@ mod roundtrip_tests { } } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index b9f34ff02265..2c1ecc85713b 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -26,7 +26,6 @@ use crate::{ to_proto, }; use arrow::datatypes::{Schema, SchemaRef}; -use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion::execution::FunctionRegistry; use datafusion::physical_plan::ExecutionPlan; @@ -75,7 +74,6 @@ pub(crate) fn proto_error>(message: S) -> DataFusionError { DataFusionError::Internal(message.into()) } -#[async_trait] pub trait AsLogicalPlan: Debug + Send + Sync + Clone { fn try_decode(buf: &[u8]) -> Result where @@ -86,7 +84,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { B: BufMut, Self: Sized; - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -115,7 +113,6 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { ) -> Result<(), DataFusionError>; } -#[async_trait] pub trait LogicalExtensionCodec: Debug + Send + Sync { fn try_decode( &self, @@ -130,7 +127,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { buf: &mut Vec, ) -> Result<(), DataFusionError>; - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], schema: SchemaRef, @@ -147,7 +144,6 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { #[derive(Debug, Clone)] pub struct DefaultLogicalExtensionCodec {} -#[async_trait] impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { fn try_decode( &self, @@ -170,7 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -196,7 +192,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { macro_rules! into_logical_plan { ($PB:expr, $CTX:expr, $CODEC:expr) => {{ if let Some(field) = $PB.as_ref() { - field.as_ref().try_into_logical_plan($CTX, $CODEC).await + field.as_ref().try_into_logical_plan($CTX, $CODEC) } else { Err(proto_error("Missing required field in protobuf")) } @@ -280,7 +276,6 @@ impl From for protobuf::JoinConstraint { } } -#[async_trait] impl AsLogicalPlan for LogicalPlanNode { fn try_decode(buf: &[u8]) -> Result where @@ -301,7 +296,7 @@ impl AsLogicalPlan for LogicalPlanNode { }) } - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -488,8 +483,7 @@ impl AsLogicalPlan for LogicalPlanNode { .map(|expr| parse_expr(expr, ctx)) .collect::, _>>()?; let provider = extension_codec - .try_decode_table_provider(&scan.custom_table_data, schema, ctx) - .await?; + .try_decode_table_provider(&scan.custom_table_data, schema, ctx)?; LogicalPlanBuilder::scan_with_filters( &scan.table_name, @@ -591,7 +585,7 @@ impl AsLogicalPlan for LogicalPlanNode { .input.clone().ok_or_else(|| DataFusionError::Internal(String::from( "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.", )))? - .try_into_logical_plan(ctx, extension_codec).await?; + .try_into_logical_plan(ctx, extension_codec)?; let definition = if !create_view.definition.is_empty() { Some(create_view.definition.clone()) } else { @@ -714,11 +708,11 @@ impl AsLogicalPlan for LogicalPlanNode { builder.build() } LogicalPlanType::Union(union) => { - let mut input_plans: Vec = vec![]; - for i in union.inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; - input_plans.push(res); - } + let mut input_plans: Vec = union + .inputs + .iter() + .map(|i| i.try_into_logical_plan(ctx, extension_codec)) + .collect::>()?; if input_plans.len() < 2 { return Err( DataFusionError::Internal(String::from( @@ -742,11 +736,10 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanBuilder::from(left).cross_join(&right)?.build() } LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => { - let mut input_plans: Vec = vec![]; - for i in inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; - input_plans.push(res); - } + let input_plans: Vec = inputs + .iter() + .map(|i| i.try_into_logical_plan(ctx, extension_codec)) + .collect::>()?; let extension_node = extension_codec.try_decode(node, &input_plans, ctx)?; From 6162fdb8c16582cfd4ad2ed2b8bb705395900223 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Wed, 26 Oct 2022 15:55:52 -0600 Subject: [PATCH 2/5] Working magically --- datafusion/core/src/datasource/datasource.rs | 3 --- datafusion/core/src/test_util.rs | 6 ------ 2 files changed, 9 deletions(-) diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 3f07201851bf..38e3193c4935 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -87,7 +87,4 @@ pub trait TableProvider: Sync + Send { pub trait TableProviderFactory: Sync + Send { /// Create a TableProvider with the given url async fn create(&self, url: &str) -> Result>; - - /// Create a TableProvider with the given url and schema - fn with_schema(&self, url: &str, schema: SchemaRef) -> Result>; } diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 8d5c1fafd46d..a080d4bab344 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -290,12 +290,6 @@ impl TableProviderFactory for TestTableFactory { url: url.to_string(), })) } - - fn with_schema(&self, url: &str, _schema: SchemaRef) -> datafusion_common::Result> { - Ok(Arc::new(TestTableProvider { - url: url.to_string(), - })) - } } /// TableProvider for testing purposes From 476b40c5b2be15b52597ca1c392ed33ca6c1a41b Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Wed, 26 Oct 2022 16:09:21 -0600 Subject: [PATCH 3/5] fmt --- datafusion/proto/src/bytes/mod.rs | 5 +---- datafusion/proto/src/logical_plan.rs | 7 +++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index c463dff53f8f..87aaed8e2d0d 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -135,10 +135,7 @@ pub fn logical_plan_to_bytes_with_extension_codec( /// Deserialize a LogicalPlan from json #[cfg(feature = "json")] -pub fn logical_plan_from_json( - json: &str, - ctx: &SessionContext, -) -> Result { +pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result { let back: protobuf::LogicalPlanNode = serde_json::from_str(json) .map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?; let extension_codec = DefaultExtensionCodec {}; diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 2c1ecc85713b..278130b06fee 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -482,8 +482,11 @@ impl AsLogicalPlan for LogicalPlanNode { .iter() .map(|expr| parse_expr(expr, ctx)) .collect::, _>>()?; - let provider = extension_codec - .try_decode_table_provider(&scan.custom_table_data, schema, ctx)?; + let provider = extension_codec.try_decode_table_provider( + &scan.custom_table_data, + schema, + ctx, + )?; LogicalPlanBuilder::scan_with_filters( &scan.table_name, From 3e13fcd855848d9e4661635ad5bf02f4fa5ee35b Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Wed, 26 Oct 2022 16:29:22 -0600 Subject: [PATCH 4/5] Fix some stuff --- datafusion/proto/src/bytes/mod.rs | 2 +- datafusion/proto/src/lib.rs | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 87aaed8e2d0d..7c8b94e5b358 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -240,7 +240,7 @@ mod test { #[test] #[cfg(feature = "json")] - async fn json_to_plan() { + fn json_to_plan() { let input = r#"{"emptyRelation":{}}"#.to_string(); let ctx = SessionContext::new(); let actual = logical_plan_from_json(&input, &ctx).unwrap(); diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index a5ba791236dd..db64d6abfbe5 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -173,21 +173,13 @@ mod roundtrip_tests { &self, buf: &[u8], schema: SchemaRef, - ctx: &SessionContext, + _ctx: &SessionContext, ) -> Result, DataFusionError> { let msg = TestTableProto::decode(buf).map_err(|_| { DataFusionError::Internal("Error encoding test table".to_string()) })?; - let factory = ctx - .state - .read() - .runtime_env - .table_factories - .get("testtable") - .expect("Unable to find testtable factory") - .clone(); - let provider = (*factory).with_schema(msg.url.as_str(), schema)?; - Ok(provider) + let provider = TestTableProvider { url: msg.url }; + Ok(Arc::new(provider)) } fn try_encode_table_provider( From e9bac28f0c3e8ab51954176d9fe699b0fc4d084a Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Wed, 26 Oct 2022 16:57:14 -0600 Subject: [PATCH 5/5] clippy --- datafusion/proto/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index db64d6abfbe5..ded96184d7e1 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -172,11 +172,11 @@ mod roundtrip_tests { fn try_decode_table_provider( &self, buf: &[u8], - schema: SchemaRef, + _schema: SchemaRef, _ctx: &SessionContext, ) -> Result, DataFusionError> { let msg = TestTableProto::decode(buf).map_err(|_| { - DataFusionError::Internal("Error encoding test table".to_string()) + DataFusionError::Internal("Error decoding test table".to_string()) })?; let provider = TestTableProvider { url: msg.url }; Ok(Arc::new(provider))