Skip to content

Commit

Permalink
Revert async changes but keep deltalake working (apache#3978)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brent Gardner authored and Dandandan committed Nov 5, 2022
1 parent c21f511 commit c3b7508
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 65 deletions.
1 change: 0 additions & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ json = ["pbjson", "pbjson-build", "serde", "serde_json"]

[dependencies]
arrow = "25.0.0"
async-trait = "0.1.41"
datafusion = { path = "../core", version = "13.0.0" }
datafusion-common = { path = "../common", version = "13.0.0" }
datafusion-expr = { path = "../expr", version = "13.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/examples/plan_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
25 changes: 10 additions & 15 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use crate::{from_proto::parse_expr, protobuf};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan};
Expand Down Expand Up @@ -136,41 +135,37 @@ pub fn logical_plan_to_bytes_with_extension_codec(

/// Deserialize a LogicalPlan from json
#[cfg(feature = "json")]
pub async fn logical_plan_from_json(
json: &str,
ctx: &SessionContext,
) -> Result<LogicalPlan> {
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?;
let extension_codec = DefaultExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec).await
back.try_into_logical_plan(ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes(
pub fn logical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<LogicalPlan> {
let extension_codec = DefaultExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes_with_extension_codec(
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
})?;
protobuf.try_into_logical_plan(ctx, extension_codec).await
protobuf.try_into_logical_plan(ctx, extension_codec)
}

#[derive(Debug)]
struct DefaultExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for DefaultExtensionCodec {
fn try_decode(
&self,
Expand All @@ -189,7 +184,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down Expand Up @@ -243,12 +238,12 @@ mod test {
assert_eq!(actual, expected);
}

#[tokio::test]
#[test]
#[cfg(feature = "json")]
async fn json_to_plan() {
fn json_to_plan() {
let input = r#"{"emptyRelation":{}}"#.to_string();
let ctx = SessionContext::new();
let actual = logical_plan_from_json(&input, &ctx).await.unwrap();
let actual = logical_plan_from_json(&input, &ctx).unwrap();
let result = matches!(actual, LogicalPlan::EmptyRelation(_));
assert!(result, "Should parse empty relation");
}
Expand Down
34 changes: 11 additions & 23 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ mod roundtrip_tests {
TimeUnit, UnionMode,
},
};
use async_trait::async_trait;
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
Expand Down Expand Up @@ -130,8 +129,7 @@ mod roundtrip_tests {
let bytes =
logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)
.await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
assert_eq!(
format!("{:?}", topk_plan),
format!("{:?}", logical_round_trip)
Expand All @@ -149,7 +147,6 @@ mod roundtrip_tests {
#[derive(Debug)]
pub struct TestTableProviderCodec {}

#[async_trait]
impl LogicalExtensionCodec for TestTableProviderCodec {
fn try_decode(
&self,
Expand All @@ -172,25 +169,17 @@ mod roundtrip_tests {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
_schema: SchemaRef,
ctx: &SessionContext,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>, DataFusionError> {
let msg = TestTableProto::decode(buf).map_err(|_| {
DataFusionError::Internal("Error encoding test table".to_string())
DataFusionError::Internal("Error decoding test table".to_string())
})?;
let factory = ctx
.state
.read()
.runtime_env
.table_factories
.get("testtable")
.expect("Unable to find testtable factory")
.clone();
let provider = (*factory).create(msg.url.as_str()).await?;
Ok(provider)
let provider = TestTableProvider { url: msg.url };
Ok(Arc::new(provider))
}

fn try_encode_table_provider(
Expand Down Expand Up @@ -229,7 +218,7 @@ mod roundtrip_tests {
let scan = ctx.table("t")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -257,7 +246,7 @@ mod roundtrip_tests {
println!("{:?}", plan);

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));

Ok(())
Expand All @@ -270,7 +259,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand All @@ -284,7 +273,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -367,7 +356,6 @@ mod roundtrip_tests {
#[derive(Debug)]
pub struct TopKExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for TopKExtensionCodec {
fn try_decode(
&self,
Expand Down Expand Up @@ -431,7 +419,7 @@ mod roundtrip_tests {
}
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down
44 changes: 20 additions & 24 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::{
to_proto,
};
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -75,7 +74,6 @@ pub(crate) fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
DataFusionError::Internal(message.into())
}

#[async_trait]
pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
fn try_decode(buf: &[u8]) -> Result<Self, DataFusionError>
where
Expand All @@ -86,7 +84,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
B: BufMut,
Self: Sized;

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -115,7 +113,6 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {
) -> Result<(), DataFusionError>;
}

#[async_trait]
pub trait LogicalExtensionCodec: Debug + Send + Sync {
fn try_decode(
&self,
Expand All @@ -130,7 +127,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError>;

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
schema: SchemaRef,
Expand All @@ -147,7 +144,6 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
#[derive(Debug, Clone)]
pub struct DefaultLogicalExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
fn try_decode(
&self,
Expand All @@ -170,7 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand All @@ -196,7 +192,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
macro_rules! into_logical_plan {
($PB:expr, $CTX:expr, $CODEC:expr) => {{
if let Some(field) = $PB.as_ref() {
field.as_ref().try_into_logical_plan($CTX, $CODEC).await
field.as_ref().try_into_logical_plan($CTX, $CODEC)
} else {
Err(proto_error("Missing required field in protobuf"))
}
Expand Down Expand Up @@ -280,7 +276,6 @@ impl From<JoinConstraint> for protobuf::JoinConstraint {
}
}

#[async_trait]
impl AsLogicalPlan for LogicalPlanNode {
fn try_decode(buf: &[u8]) -> Result<Self, DataFusionError>
where
Expand All @@ -301,7 +296,7 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -487,9 +482,11 @@ impl AsLogicalPlan for LogicalPlanNode {
.iter()
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let provider = extension_codec
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)
.await?;
let provider = extension_codec.try_decode_table_provider(
&scan.custom_table_data,
schema,
ctx,
)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down Expand Up @@ -591,7 +588,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec).await?;
.try_into_logical_plan(ctx, extension_codec)?;
let definition = if !create_view.definition.is_empty() {
Some(create_view.definition.clone())
} else {
Expand Down Expand Up @@ -714,11 +711,11 @@ impl AsLogicalPlan for LogicalPlanNode {
builder.build()
}
LogicalPlanType::Union(union) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in union.inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
input_plans.push(res);
}
let mut input_plans: Vec<LogicalPlan> = union
.inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_, DataFusionError>>()?;

if input_plans.len() < 2 {
return Err( DataFusionError::Internal(String::from(
Expand All @@ -742,11 +739,10 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanBuilder::from(left).cross_join(&right)?.build()
}
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
input_plans.push(res);
}
let input_plans: Vec<LogicalPlan> = inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_, DataFusionError>>()?;

let extension_node =
extension_codec.try_decode(node, &input_plans, ctx)?;
Expand Down

0 comments on commit c3b7508

Please sign in to comment.