diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index ef5d8211248a..ff2af33b22d8 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -41,7 +41,6 @@ json = ["pbjson", "pbjson-build", "serde", "serde_json"] [dependencies] arrow = "25.0.0" -async-trait = "0.1.41" datafusion = { path = "../core", version = "13.0.0" } datafusion-common = { path = "../common", version = "13.0.0" } datafusion-expr = { path = "../expr", version = "13.0.0" } diff --git a/datafusion/proto/README.md b/datafusion/proto/README.md index 8c8962e506a6..a3878447e042 100644 --- a/datafusion/proto/README.md +++ b/datafusion/proto/README.md @@ -63,7 +63,7 @@ async fn main() -> Result<()> { ?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/examples/plan_serde.rs b/datafusion/proto/examples/plan_serde.rs index eed372476fff..d98d88b2a46a 100644 --- a/datafusion/proto/examples/plan_serde.rs +++ b/datafusion/proto/examples/plan_serde.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 5695bf50686a..7c8b94e5b358 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -19,7 +19,6 @@ use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; use crate::{from_proto::parse_expr, protobuf}; use arrow::datatypes::SchemaRef; -use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan}; @@ -136,27 +135,24 @@ pub fn logical_plan_to_bytes_with_extension_codec( /// Deserialize a LogicalPlan from json #[cfg(feature = "json")] -pub async fn logical_plan_from_json( - json: &str, - ctx: &SessionContext, -) -> Result { +pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result { let back: protobuf::LogicalPlanNode = serde_json::from_str(json) .map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?; let extension_codec = DefaultExtensionCodec {}; - back.try_into_logical_plan(ctx, &extension_codec).await + back.try_into_logical_plan(ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes( +pub fn logical_plan_from_bytes( bytes: &[u8], ctx: &SessionContext, ) -> Result { let extension_codec = DefaultExtensionCodec {}; - logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await + logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes_with_extension_codec( +pub fn logical_plan_from_bytes_with_extension_codec( bytes: &[u8], ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -164,13 +160,12 @@ pub async fn logical_plan_from_bytes_with_extension_codec( let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| { DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e)) })?; - protobuf.try_into_logical_plan(ctx, extension_codec).await + protobuf.try_into_logical_plan(ctx, extension_codec) } #[derive(Debug)] struct DefaultExtensionCodec {} -#[async_trait] impl LogicalExtensionCodec for DefaultExtensionCodec { fn try_decode( &self, @@ -189,7 +184,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -243,12 +238,12 @@ mod test { assert_eq!(actual, expected); } - #[tokio::test] + #[test] #[cfg(feature = "json")] - async fn json_to_plan() { + fn json_to_plan() { let input = r#"{"emptyRelation":{}}"#.to_string(); let ctx = SessionContext::new(); - let actual = logical_plan_from_json(&input, &ctx).await.unwrap(); + let actual = logical_plan_from_json(&input, &ctx).unwrap(); let result = matches!(actual, LogicalPlan::EmptyRelation(_)); assert!(result, "Should parse empty relation"); } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 1552542a1071..ded96184d7e1 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -59,7 +59,6 @@ mod roundtrip_tests { TimeUnit, UnionMode, }, }; - use async_trait::async_trait; use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -130,8 +129,7 @@ mod roundtrip_tests { let bytes = logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec) - .await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?; assert_eq!( format!("{:?}", topk_plan), format!("{:?}", logical_round_trip) @@ -149,7 +147,6 @@ mod roundtrip_tests { #[derive(Debug)] pub struct TestTableProviderCodec {} - #[async_trait] impl LogicalExtensionCodec for TestTableProviderCodec { fn try_decode( &self, @@ -172,25 +169,17 @@ mod roundtrip_tests { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], _schema: SchemaRef, - ctx: &SessionContext, + _ctx: &SessionContext, ) -> Result, DataFusionError> { let msg = TestTableProto::decode(buf).map_err(|_| { - DataFusionError::Internal("Error encoding test table".to_string()) + DataFusionError::Internal("Error decoding test table".to_string()) })?; - let factory = ctx - .state - .read() - .runtime_env - .table_factories - .get("testtable") - .expect("Unable to find testtable factory") - .clone(); - let provider = (*factory).create(msg.url.as_str()).await?; - Ok(provider) + let provider = TestTableProvider { url: msg.url }; + Ok(Arc::new(provider)) } fn try_encode_table_provider( @@ -229,7 +218,7 @@ mod roundtrip_tests { let scan = ctx.table("t")?.to_logical_plan()?; let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?; assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -257,7 +246,7 @@ mod roundtrip_tests { println!("{:?}", plan); let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) @@ -270,7 +259,7 @@ mod roundtrip_tests { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -284,7 +273,7 @@ mod roundtrip_tests { .await?; let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -367,7 +356,6 @@ mod roundtrip_tests { #[derive(Debug)] pub struct TopKExtensionCodec {} - #[async_trait] impl LogicalExtensionCodec for TopKExtensionCodec { fn try_decode( &self, @@ -431,7 +419,7 @@ mod roundtrip_tests { } } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index b9f34ff02265..278130b06fee 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -26,7 +26,6 @@ use crate::{ to_proto, }; use arrow::datatypes::{Schema, SchemaRef}; -use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion::execution::FunctionRegistry; use datafusion::physical_plan::ExecutionPlan; @@ -75,7 +74,6 @@ pub(crate) fn proto_error>(message: S) -> DataFusionError { DataFusionError::Internal(message.into()) } -#[async_trait] pub trait AsLogicalPlan: Debug + Send + Sync + Clone { fn try_decode(buf: &[u8]) -> Result where @@ -86,7 +84,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { B: BufMut, Self: Sized; - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -115,7 +113,6 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { ) -> Result<(), DataFusionError>; } -#[async_trait] pub trait LogicalExtensionCodec: Debug + Send + Sync { fn try_decode( &self, @@ -130,7 +127,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { buf: &mut Vec, ) -> Result<(), DataFusionError>; - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], schema: SchemaRef, @@ -147,7 +144,6 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { #[derive(Debug, Clone)] pub struct DefaultLogicalExtensionCodec {} -#[async_trait] impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { fn try_decode( &self, @@ -170,7 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -196,7 +192,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { macro_rules! into_logical_plan { ($PB:expr, $CTX:expr, $CODEC:expr) => {{ if let Some(field) = $PB.as_ref() { - field.as_ref().try_into_logical_plan($CTX, $CODEC).await + field.as_ref().try_into_logical_plan($CTX, $CODEC) } else { Err(proto_error("Missing required field in protobuf")) } @@ -280,7 +276,6 @@ impl From for protobuf::JoinConstraint { } } -#[async_trait] impl AsLogicalPlan for LogicalPlanNode { fn try_decode(buf: &[u8]) -> Result where @@ -301,7 +296,7 @@ impl AsLogicalPlan for LogicalPlanNode { }) } - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -487,9 +482,11 @@ impl AsLogicalPlan for LogicalPlanNode { .iter() .map(|expr| parse_expr(expr, ctx)) .collect::, _>>()?; - let provider = extension_codec - .try_decode_table_provider(&scan.custom_table_data, schema, ctx) - .await?; + let provider = extension_codec.try_decode_table_provider( + &scan.custom_table_data, + schema, + ctx, + )?; LogicalPlanBuilder::scan_with_filters( &scan.table_name, @@ -591,7 +588,7 @@ impl AsLogicalPlan for LogicalPlanNode { .input.clone().ok_or_else(|| DataFusionError::Internal(String::from( "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.", )))? - .try_into_logical_plan(ctx, extension_codec).await?; + .try_into_logical_plan(ctx, extension_codec)?; let definition = if !create_view.definition.is_empty() { Some(create_view.definition.clone()) } else { @@ -714,11 +711,11 @@ impl AsLogicalPlan for LogicalPlanNode { builder.build() } LogicalPlanType::Union(union) => { - let mut input_plans: Vec = vec![]; - for i in union.inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; - input_plans.push(res); - } + let mut input_plans: Vec = union + .inputs + .iter() + .map(|i| i.try_into_logical_plan(ctx, extension_codec)) + .collect::>()?; if input_plans.len() < 2 { return Err( DataFusionError::Internal(String::from( @@ -742,11 +739,10 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanBuilder::from(left).cross_join(&right)?.build() } LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => { - let mut input_plans: Vec = vec![]; - for i in inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; - input_plans.push(res); - } + let input_plans: Vec = inputs + .iter() + .map(|i| i.try_into_logical_plan(ctx, extension_codec)) + .collect::>()?; let extension_node = extension_codec.try_decode(node, &input_plans, ctx)?;