Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TableProvider for DataFrameImpl #1699

Merged
merged 18 commits into from
Jan 30, 2022
Merged
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@

//! Implementation of DataFrame API.

use std::any::Any;
use std::sync::{Arc, Mutex};

use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use crate::scalar::ScalarValue;
use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};

use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -62,6 +68,59 @@ impl DataFrameImpl {
}
}

#[async_trait]
impl TableProvider for DataFrameImpl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we can add TableProvider as one of the trait bounds for the Dataframe trait so the trait object can be used to register tables without casting to a concrete type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of what you mean?

It won't be possible to register Arc<dyn DataFrame>s directly even if DataFrame has a trait bound of TableProvider, because that would require trait upcasting IIUC, which is still an unstable feature.

Copy link
Contributor

@alamb alamb Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @houqp was suggesting changing

trait DataFrame {
...
}

To something like

trait DataFrame:: TableProvider {
}

And then impl TableProvider for DataFrame (rather than DataFrameImpl) -- which would mean that any &dyn DataFrame could be used as a table provider

If this is interesting, I am happy to file a ticket describing it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I don't think you can impl a trait for a trait (since the introduction of the dyn requirement) but we might be able to do a blanket impl like

impl<D: DataFrame> TableProvider for D {
    ...
}

Along with the trait bound, I think this will work to allow any DataFrame implementation to be used as a table provider.

The main question for me is: given that the scan method returns a physical plan, do I need to add a new DataFrame method to get a physical plan? Or is there another way to get that with existing APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I may have a prototype, I'll push something up here in a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any way to achieve this behavior

        let df = test_table().await?.select_columns(&["c1", "c12"])?;
        let mut ctx = ExecutionContext::new();

        // register a dataframe as a table
        ctx.register_table("test_table", df)?;

without trait_upcasting 😞, even with a blanket impl plus an additional trait bound. I'll put up a draft PR.

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
let schema: Schema = self.plan.schema().as_ref().into();
Arc::new(schema)
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let expr = projection
.as_ref()
// construct projections
.map_or_else(
|| Ok(Arc::new(Self::new(self.ctx_state.clone(), &self.plan)) as Arc<_>),
|projection| {
let schema = TableProvider::schema(self).project(projection)?;
let names = schema
.fields()
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
self.select_columns(names.as_slice())
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
},
)?
// add predicates, otherwise use `true` as the predicate
.filter(filters.iter().cloned().fold(
Expr::Literal(ScalarValue::Boolean(Some(true))),
|acc, new| acc.and(new),
))?;
// add a limit if given
Self::new(
self.ctx_state.clone(),
&limit
.map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
.to_logical_plan(),
)
.create_physical_plan()
.await
}
}

#[async_trait]
impl DataFrame for DataFrameImpl {
/// Apply a projection based on a list of column names
Expand Down Expand Up @@ -488,6 +547,44 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn register_table() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c12"])?;
let mut ctx = ExecutionContext::new();
let df_impl = DataFrameImpl::new(ctx.state.clone(), &df.to_logical_plan());

// register a dataframe as a table
ctx.register_table("test_table", Arc::new(df_impl))?;

// pull the table out and compare the plans
let table = ctx.table("test_table")?;

// check that we correctly read from the table
let df_results = &table
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
.aggregate(vec![col("c1")], vec![sum(col("c12"))])?
.collect()
.await?;
let table_results = &table
.aggregate(vec![col("c1")], vec![sum(col("c12"))])?
.collect()
.await?;

let expected_lines = vec![
"+----+---------------------+",
"| c1 | SUM(test_table.c12) |",
"+----+---------------------+",
"| a | 10.238448667882977 |",
"| b | 7.797734760124923 |",
"| c | 13.860958726523545 |",
"| d | 8.793968289758968 |",
"| e | 10.206140546981722 |",
"+----+---------------------+",
];

assert_batches_sorted_eq!(expected_lines, df_results);
assert_batches_sorted_eq!(expected_lines, table_results);
Ok(())
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
}
/// Compare the formatted string representation of two plans for equality
fn assert_same_plan(plan1: &LogicalPlan, plan2: &LogicalPlan) {
assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));
Expand Down