Skip to content

Commit

Permalink
Async catalog support (apache#3777)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 13, 2022
1 parent f8a3d58 commit 8c0d1ca
Show file tree
Hide file tree
Showing 31 changed files with 300 additions and 158 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false

[patch.crates-io]
sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" }
3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ object_store = { version = "0.5.0", features = ["aws", "gcp"] }
rustyline = "10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
url = "2.2"

[patch.crates-io]
sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" }
2 changes: 1 addition & 1 deletion datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum TableReference<'a> {

/// Represents a path to a table that may require further resolution
/// that owns the underlying names
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Hash, PartialEq)]
pub enum OwnedTableReference {
/// An unqualified table reference, e.g. "table"
Bare {
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use parking_lot::RwLock;

use datafusion_common::Result;
Expand All @@ -45,12 +46,16 @@ use super::{
schema::SchemaProvider,
};

const INFORMATION_SCHEMA: &str = "information_schema";
/// The name of the information schema
pub const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const DF_SETTINGS: &str = "df_settings";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
Expand Down Expand Up @@ -132,7 +137,7 @@ struct InformationSchemaConfig {

impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
// create a mem table with the names of tables

for catalog_name in self.catalog_list.catalog_names() {
Expand All @@ -142,7 +147,7 @@ impl InformationSchemaConfig {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
let table = schema.table(&table_name).await.unwrap();
builder.add_table(
&catalog_name,
&schema_name,
Expand Down Expand Up @@ -171,15 +176,15 @@ impl InformationSchemaConfig {
}
}

fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
let table = schema.table(&table_name).await.unwrap();
builder.add_view(
&catalog_name,
&schema_name,
Expand All @@ -193,15 +198,15 @@ impl InformationSchemaConfig {
}

/// Construct the `information_schema.columns` virtual table
fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
let table = schema.table(&table_name).await.unwrap();
for (i, field) in table.schema().fields().iter().enumerate() {
builder.add_column(
&catalog_name,
Expand All @@ -227,6 +232,7 @@ impl InformationSchemaConfig {
}
}

#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &(dyn Any + 'static) {
self
Expand All @@ -241,7 +247,7 @@ impl SchemaProvider for InformationSchemaProvider {
]
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let config = self.config.clone();
let table: Arc<dyn PartitionStream> = if name.eq_ignore_ascii_case("tables") {
Arc::new(InformationSchemaTables::new(config))
Expand Down Expand Up @@ -305,7 +311,7 @@ impl PartitionStream for InformationSchemaTables {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_tables(&mut builder);
config.make_tables(&mut builder).await;
Ok(builder.finish())
}),
))
Expand Down Expand Up @@ -396,7 +402,7 @@ impl PartitionStream for InformationSchemaViews {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_views(&mut builder);
config.make_views(&mut builder).await;
Ok(builder.finish())
}),
))
Expand Down Expand Up @@ -510,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_columns(&mut builder);
config.make_columns(&mut builder).await;
Ok(builder.finish())
}),
))
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;

/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
///
Expand Down Expand Up @@ -148,6 +149,7 @@ impl ListingSchemaProvider {
}
}

#[async_trait]
impl SchemaProvider for ListingSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -162,7 +164,7 @@ impl SchemaProvider for ListingSchemaProvider {
.collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables
.lock()
.expect("Can't lock tables")
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
use dashmap::DashMap;
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;

use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};

/// Represents a schema, comprising a number of named tables.
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the schema provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
Expand All @@ -35,7 +37,7 @@ pub trait SchemaProvider: Sync + Send {
fn table_names(&self) -> Vec<String>;

/// Retrieves a specific table from the schema by name, provided it exists.
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;

/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
Expand Down Expand Up @@ -85,6 +87,7 @@ impl Default for MemorySchemaProvider {
}
}

#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -97,7 +100,7 @@ impl SchemaProvider for MemorySchemaProvider {
.collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables.get(name).map(|table| table.value().clone())
}

Expand Down
34 changes: 22 additions & 12 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ mod tests {
let ctx = SessionContext::new();
ctx.register_batch("t", batch)?;

let df = ctx.table("t")?.select_columns(&["f.c1"])?;
let df = ctx.table("t").await?.select_columns(&["f.c1"])?;

let df_results = df.collect().await?;

Expand Down Expand Up @@ -1040,16 +1040,17 @@ mod tests {
));

// build query with a UDF using DataFrame API
let df = ctx.table("aggregate_test_100")?;
let df = ctx.table("aggregate_test_100").await?;

let f = df.registry();

let df = df.select(vec![f.udf("my_fn")?.call(vec![col("c12")])])?;
let plan = df.plan.clone();

// build query using SQL
let sql_plan =
ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?;
let sql_plan = ctx
.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")
.await?;

// the two plans should be identical
assert_same_plan(&plan, &sql_plan);
Expand Down Expand Up @@ -1106,7 +1107,7 @@ mod tests {
ctx.register_table("test_table", df_impl.clone())?;

// pull the table out
let table = ctx.table("test_table")?;
let table = ctx.table("test_table").await?;

let group_expr = vec![col("c1")];
let aggr_expr = vec![sum(col("c12"))];
Expand Down Expand Up @@ -1160,13 +1161,13 @@ mod tests {
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
ctx.create_logical_plan(sql)
ctx.create_logical_plan(sql).await
}

async fn test_table_with_name(name: &str) -> Result<Arc<DataFrame>> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, name).await?;
ctx.table(name)
ctx.table(name).await
}

async fn test_table() -> Result<Arc<DataFrame>> {
Expand Down Expand Up @@ -1300,8 +1301,15 @@ mod tests {
ctx.register_table("t1", df.clone())?;
ctx.register_table("t2", df)?;
let df = ctx
.table("t1")?
.join(ctx.table("t2")?, JoinType::Inner, &["c1"], &["c1"], None)?
.table("t1")
.await?
.join(
ctx.table("t2").await?,
JoinType::Inner,
&["c1"],
&["c1"],
None,
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true),
Expand Down Expand Up @@ -1378,10 +1386,11 @@ mod tests {
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;
ctx.register_table("t1", ctx.table("test").await?)?;

let df = ctx
.table("t1")?
.table("t1")
.await?
.filter(col("id").eq(lit(1)))?
.select_columns(&["bool_col", "int_col"])?;

Expand Down Expand Up @@ -1462,7 +1471,8 @@ mod tests {
ctx.register_batch("t", batch)?;

let df = ctx
.table("t")?
.table("t")
.await?
// try and create a column with a '.' in it
.with_column("f.c2", lit("hello"))?;

Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,13 @@ mod tests {
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;
ctx.register_table("t1", ctx.table("test").await?)?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

let df = ctx
.table("t2")?
.table("t2")
.await?
.filter(col("id").eq(lit(1)))?
.select_columns(&["bool_col", "int_col"])?;

Expand All @@ -460,12 +461,13 @@ mod tests {
)
.await?;

ctx.register_table("t1", ctx.table("test")?)?;
ctx.register_table("t1", ctx.table("test").await?)?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

let df = ctx
.table("t2")?
.table("t2")
.await?
.limit(0, Some(10))?
.select_columns(&["bool_col", "int_col"])?;

Expand Down
Loading

0 comments on commit 8c0d1ca

Please sign in to comment.