Skip to content

Commit

Permalink
Add OR REPLACE and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed May 10, 2022
1 parent 8047909 commit 48c4349
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 18 deletions.
190 changes: 173 additions & 17 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use crate::{
physical_plan::ExecutionPlan,
};

use crate::datasource::{
datasource::TableProviderFilterPushDown, TableProvider, TableType,
};
use crate::datasource::{TableProvider, TableType};

/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
Expand Down Expand Up @@ -76,42 +74,40 @@ impl TableProvider for ViewTable {

async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.context.create_physical_plan(&self.logical_plan).await
}

fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}

#[cfg(test)]
mod tests {
use crate::assert_batches_eq;
use crate::{assert_batches_eq, execution::context::SessionConfig};

use super::*;

#[tokio::test]
async fn query_view() -> Result<()> {
let session_ctx = SessionContext::new();
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3)")
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
.await?
.collect()
.await?;

let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
assert_eq!(results[0].num_rows(), 1);

let results = session_ctx
.sql("SELECT * FROM abc")
.sql("SELECT * FROM xyz")
.await?
.collect()
.await?;
Expand All @@ -121,11 +117,171 @@ mod tests {
"| column1 | column2 | column3 |",
"+---------+---------+---------+",
"| 1 | 2 | 3 |",
"| 4 | 5 | 6 |",
"+---------+---------+---------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn query_view_with_projection() -> Result<()> {
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
.await?
.collect()
.await?;

let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
assert_eq!(results[0].num_rows(), 1);

let results = session_ctx
.sql("SELECT column1 FROM xyz")
.await?
.collect()
.await?;

let expected = vec![
"+---------+",
"| column1 |",
"+---------+",
"| 1 |",
"| 4 |",
"+---------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn query_view_plan() -> Result<()> {
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
.await?
.collect()
.await?;

let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
.await?
.collect()
.await?;

let expected = vec![
"+---------------+--------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | TableScan: abc projection=Some([0, 1, 2]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+--------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);

let results = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5")
.await?
.collect()
.await?;

let expected = vec![
"+---------------+--------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([0, 1, 2]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+--------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);

let results = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
.await?
.collect()
.await?;

let expected = vec![
"+---------------+----------------------------------------------+",
"| plan_type | plan |",
"+---------------+----------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([0, 1]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+----------------------------------------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn create_or_replace_view() -> Result<()> {
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
.await?
.collect()
.await?;

let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let view_sql = "CREATE OR REPLACE VIEW xyz AS SELECT column1 FROM abc";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
assert_eq!(results[0].num_rows(), 1);

let results = session_ctx
.sql("SELECT * FROM xyz")
.await?
.collect()
.await?;

let expected = vec![
"+---------+",
"| column1 |",
"+---------+",
"| 1 |",
"| 4 |",
"+---------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}
}
7 changes: 6 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,12 @@ impl SessionContext {

match (or_replace, view) {
(true, Ok(_)) => {
let plan = LogicalPlanBuilder::empty(false).build()?;
self.deregister_table(name.as_str())?;
let plan = self.optimize(&input)?;
let table =
Arc::new(ViewTable::try_new(self.clone(), plan.clone())?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(_, Err(_)) => {
Expand Down

0 comments on commit 48c4349

Please sign in to comment.