Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert async changes but keep deltalake working #3978

Merged
merged 5 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ pub trait TableProvider: Sync + Send {
pub trait TableProviderFactory: Sync + Send {
/// Create a TableProvider with the given url
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;

/// Create a TableProvider with the given url and schema
fn with_schema(&self, url: &str, schema: SchemaRef) -> Result<Arc<dyn TableProvider>>;
}
6 changes: 6 additions & 0 deletions datafusion/core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ impl TableProviderFactory for TestTableFactory {
url: url.to_string(),
}))
}

fn with_schema(&self, url: &str, _schema: SchemaRef) -> datafusion_common::Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
url: url.to_string(),
}))
}
}

/// TableProvider for testing purposes
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ json = ["pbjson", "pbjson-build", "serde", "serde_json"]

[dependencies]
arrow = "25.0.0"
async-trait = "0.1.41"
datafusion = { path = "../core", version = "13.0.0" }
datafusion-common = { path = "../common", version = "13.0.0" }
datafusion-expr = { path = "../expr", version = "13.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/examples/plan_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
20 changes: 9 additions & 11 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use crate::{from_proto::parse_expr, protobuf};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan};
Expand Down Expand Up @@ -136,41 +135,40 @@ pub fn logical_plan_to_bytes_with_extension_codec(

/// Deserialize a LogicalPlan from json
#[cfg(feature = "json")]
pub async fn logical_plan_from_json(
pub fn logical_plan_from_json(
json: &str,
ctx: &SessionContext,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?;
let extension_codec = DefaultExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec).await
back.try_into_logical_plan(ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes(
pub fn logical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<LogicalPlan> {
let extension_codec = DefaultExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes_with_extension_codec(
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
})?;
protobuf.try_into_logical_plan(ctx, extension_codec).await
protobuf.try_into_logical_plan(ctx, extension_codec)
}

#[derive(Debug)]
struct DefaultExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for DefaultExtensionCodec {
fn try_decode(
&self,
Expand All @@ -189,7 +187,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down Expand Up @@ -243,12 +241,12 @@ mod test {
assert_eq!(actual, expected);
}

#[tokio::test]
#[test]
#[cfg(feature = "json")]
async fn json_to_plan() {
let input = r#"{"emptyRelation":{}}"#.to_string();
let ctx = SessionContext::new();
let actual = logical_plan_from_json(&input, &ctx).await.unwrap();
let actual = logical_plan_from_json(&input, &ctx).unwrap();
let result = matches!(actual, LogicalPlan::EmptyRelation(_));
assert!(result, "Should parse empty relation");
}
Expand Down
22 changes: 9 additions & 13 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ mod roundtrip_tests {
TimeUnit, UnionMode,
},
};
use async_trait::async_trait;
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
Expand Down Expand Up @@ -130,8 +129,7 @@ mod roundtrip_tests {
let bytes =
logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)
.await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
assert_eq!(
format!("{:?}", topk_plan),
format!("{:?}", logical_round_trip)
Expand All @@ -149,7 +147,6 @@ mod roundtrip_tests {
#[derive(Debug)]
pub struct TestTableProviderCodec {}

#[async_trait]
impl LogicalExtensionCodec for TestTableProviderCodec {
fn try_decode(
&self,
Expand All @@ -172,10 +169,10 @@ mod roundtrip_tests {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
_schema: SchemaRef,
schema: SchemaRef,
ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>, DataFusionError> {
let msg = TestTableProto::decode(buf).map_err(|_| {
Expand All @@ -189,7 +186,7 @@ mod roundtrip_tests {
.get("testtable")
.expect("Unable to find testtable factory")
.clone();
let provider = (*factory).create(msg.url.as_str()).await?;
let provider = (*factory).with_schema(msg.url.as_str(), schema)?;
Ok(provider)
}

Expand Down Expand Up @@ -229,7 +226,7 @@ mod roundtrip_tests {
let scan = ctx.table("t")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -257,7 +254,7 @@ mod roundtrip_tests {
println!("{:?}", plan);

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));

Ok(())
Expand All @@ -270,7 +267,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand All @@ -284,7 +281,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -367,7 +364,6 @@ mod roundtrip_tests {
#[derive(Debug)]
pub struct TopKExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for TopKExtensionCodec {
fn try_decode(
&self,
Expand Down Expand Up @@ -431,7 +427,7 @@ mod roundtrip_tests {
}
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down
39 changes: 16 additions & 23 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::{
to_proto,
};
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -75,7 +74,6 @@ pub(crate) fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
DataFusionError::Internal(message.into())
}

#[async_trait]
pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
fn try_decode(buf: &[u8]) -> Result<Self, DataFusionError>
where
Expand All @@ -86,7 +84,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
B: BufMut,
Self: Sized;

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -115,7 +113,6 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {
) -> Result<(), DataFusionError>;
}

#[async_trait]
pub trait LogicalExtensionCodec: Debug + Send + Sync {
fn try_decode(
&self,
Expand All @@ -130,7 +127,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError>;

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
schema: SchemaRef,
Expand All @@ -147,7 +144,6 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
#[derive(Debug, Clone)]
pub struct DefaultLogicalExtensionCodec {}

#[async_trait]
impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
fn try_decode(
&self,
Expand All @@ -170,7 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand All @@ -196,7 +192,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
macro_rules! into_logical_plan {
($PB:expr, $CTX:expr, $CODEC:expr) => {{
if let Some(field) = $PB.as_ref() {
field.as_ref().try_into_logical_plan($CTX, $CODEC).await
field.as_ref().try_into_logical_plan($CTX, $CODEC)
} else {
Err(proto_error("Missing required field in protobuf"))
}
Expand Down Expand Up @@ -280,7 +276,6 @@ impl From<JoinConstraint> for protobuf::JoinConstraint {
}
}

#[async_trait]
impl AsLogicalPlan for LogicalPlanNode {
fn try_decode(buf: &[u8]) -> Result<Self, DataFusionError>
where
Expand All @@ -301,7 +296,7 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -488,8 +483,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let provider = extension_codec
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)
.await?;
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down Expand Up @@ -591,7 +585,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec).await?;
.try_into_logical_plan(ctx, extension_codec)?;
let definition = if !create_view.definition.is_empty() {
Some(create_view.definition.clone())
} else {
Expand Down Expand Up @@ -714,11 +708,11 @@ impl AsLogicalPlan for LogicalPlanNode {
builder.build()
}
LogicalPlanType::Union(union) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in union.inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
input_plans.push(res);
}
let mut input_plans: Vec<LogicalPlan> = union
.inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_, DataFusionError>>()?;

if input_plans.len() < 2 {
return Err( DataFusionError::Internal(String::from(
Expand All @@ -742,11 +736,10 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanBuilder::from(left).cross_join(&right)?.build()
}
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
input_plans.push(res);
}
let input_plans: Vec<LogicalPlan> = inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_, DataFusionError>>()?;

let extension_node =
extension_codec.try_decode(node, &input_plans, ctx)?;
Expand Down