From f8a3d584c8a392574347ebab97b26c07b054e93a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 13 Dec 2022 20:37:40 +0000 Subject: [PATCH] Lazy system tables (#4606) --- .../core/src/catalog/information_schema.rs | 521 ++++++++++-------- datafusion/core/src/datasource/mod.rs | 1 + datafusion/core/src/datasource/streaming.rs | 93 ++++ datafusion/core/src/physical_plan/mod.rs | 1 + .../core/src/physical_plan/streaming.rs | 124 +++++ 5 files changed, 516 insertions(+), 224 deletions(-) create mode 100644 datafusion/core/src/datasource/streaming.rs create mode 100644 datafusion/core/src/physical_plan/streaming.rs diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index 957cac53a93b..f7ef6b93dea9 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -20,29 +20,31 @@ //! Information Schema] use std::{ - any, + any::Any, sync::{Arc, Weak}, }; -use parking_lot::RwLock; - use arrow::{ array::{StringBuilder, UInt64Builder}, - datatypes::{DataType, Field, Schema}, + datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; +use parking_lot::RwLock; + use datafusion_common::Result; -use crate::datasource::{MemTable, TableProvider}; +use crate::config::ConfigOptions; +use crate::datasource::streaming::{PartitionStream, StreamingTable}; +use crate::datasource::TableProvider; use crate::logical_expr::TableType; +use crate::physical_plan::stream::RecordBatchStreamAdapter; +use crate::physical_plan::SendableRecordBatchStream; use super::{ catalog::{CatalogList, CatalogProvider}, schema::SchemaProvider, }; -use crate::config::ConfigOptions; - const INFORMATION_SCHEMA: &str = "information_schema"; const TABLES: &str = "tables"; const VIEWS: &str = "views"; @@ -73,7 +75,7 @@ impl CatalogWithInformationSchema { } impl CatalogProvider for CatalogWithInformationSchema { - fn as_any(&self) -> &dyn any::Any { + fn as_any(&self) -> &dyn Any { self } @@ -90,8 +92,10 @@ impl CatalogProvider for CatalogWithInformationSchema { Weak::upgrade(&self.catalog_list).and_then(|catalog_list| { Weak::upgrade(&self.config_options).map(|config_options| { Arc::new(InformationSchemaProvider { - catalog_list, - config_options, + config: InformationSchemaConfig { + catalog_list, + config_options, + }, }) as Arc }) }) @@ -117,15 +121,19 @@ impl CatalogProvider for CatalogWithInformationSchema { /// providers, they will appear the next time the `information_schema` /// table is queried. struct InformationSchemaProvider { + config: InformationSchemaConfig, +} + +#[derive(Clone)] +struct InformationSchemaConfig { catalog_list: Arc, config_options: Arc>, } -impl InformationSchemaProvider { +impl InformationSchemaConfig { /// Construct the `information_schema.tables` virtual table - fn make_tables(&self) -> Arc { + fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) { // create a mem table with the names of tables - let mut builder = InformationSchemaTablesBuilder::new(); for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -161,15 +169,9 @@ impl InformationSchemaProvider { TableType::View, ); } - - let mem_table: MemTable = builder.into(); - - Arc::new(mem_table) } - fn make_views(&self) -> Arc { - let mut builder = InformationSchemaViewBuilder::new(); - + fn make_views(&self, builder: &mut InformationSchemaViewBuilder) { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -188,15 +190,10 @@ impl InformationSchemaProvider { } } } - - let mem_table: MemTable = builder.into(); - Arc::new(mem_table) } /// Construct the `information_schema.columns` virtual table - fn make_columns(&self) -> Arc { - let mut builder = InformationSchemaColumnsBuilder::new(); - + fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -220,47 +217,47 @@ impl InformationSchemaProvider { } } } - - let mem_table: MemTable = builder.into(); - - Arc::new(mem_table) } /// Construct the `information_schema.df_settings` virtual table - fn make_df_settings(&self) -> Arc { - let mut builder = InformationSchemaDfSettingsBuilder::new(); - + fn make_df_settings(&self, builder: &mut InformationSchemaDfSettingsBuilder) { for (name, setting) in self.config_options.read().options() { builder.add_setting(name, setting.to_string()); } - - let mem_table: MemTable = builder.into(); - - Arc::new(mem_table) } } impl SchemaProvider for InformationSchemaProvider { - fn as_any(&self) -> &(dyn any::Any + 'static) { + fn as_any(&self) -> &(dyn Any + 'static) { self } fn table_names(&self) -> Vec { - vec![TABLES.to_string(), VIEWS.to_string(), COLUMNS.to_string()] + vec![ + TABLES.to_string(), + VIEWS.to_string(), + COLUMNS.to_string(), + DF_SETTINGS.to_string(), + ] } fn table(&self, name: &str) -> Option> { - if name.eq_ignore_ascii_case("tables") { - Some(self.make_tables()) + let config = self.config.clone(); + let table: Arc = if name.eq_ignore_ascii_case("tables") { + Arc::new(InformationSchemaTables::new(config)) } else if name.eq_ignore_ascii_case("columns") { - Some(self.make_columns()) + Arc::new(InformationSchemaColumns::new(config)) } else if name.eq_ignore_ascii_case("views") { - Some(self.make_views()) + Arc::new(InformationSchemaViews::new(config)) } else if name.eq_ignore_ascii_case("df_settings") { - Some(self.make_df_settings()) + Arc::new(InformationSchemaDfSettings::new(config)) } else { - None - } + return None; + }; + + Some(Arc::new( + StreamingTable::try_new(table.schema().clone(), vec![table]).unwrap(), + )) } fn table_exist(&self, name: &str) -> bool { @@ -268,10 +265,58 @@ impl SchemaProvider for InformationSchemaProvider { } } +struct InformationSchemaTables { + schema: SchemaRef, + config: InformationSchemaConfig, +} + +impl InformationSchemaTables { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("table_catalog", DataType::Utf8, false), + Field::new("table_schema", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("table_type", DataType::Utf8, false), + ])); + + Self { schema, config } + } + + fn builder(&self) -> InformationSchemaTablesBuilder { + InformationSchemaTablesBuilder { + catalog_names: StringBuilder::new(), + schema_names: StringBuilder::new(), + table_names: StringBuilder::new(), + table_types: StringBuilder::new(), + schema: self.schema.clone(), + } + } +} + +impl PartitionStream for InformationSchemaTables { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> SendableRecordBatchStream { + let mut builder = self.builder(); + let config = self.config.clone(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + // TODO: Stream this + futures::stream::once(async move { + config.make_tables(&mut builder); + Ok(builder.finish()) + }), + )) + } +} + /// Builds the `information_schema.TABLE` table row by row /// /// Columns are based on struct InformationSchemaTablesBuilder { + schema: SchemaRef, catalog_names: StringBuilder, schema_names: StringBuilder, table_names: StringBuilder, @@ -279,15 +324,6 @@ struct InformationSchemaTablesBuilder { } impl InformationSchemaTablesBuilder { - fn new() -> Self { - Self { - catalog_names: StringBuilder::new(), - schema_names: StringBuilder::new(), - table_names: StringBuilder::new(), - table_types: StringBuilder::new(), - } - } - fn add_table( &mut self, catalog_name: impl AsRef, @@ -305,37 +341,65 @@ impl InformationSchemaTablesBuilder { TableType::Temporary => "LOCAL TEMPORARY", }); } + + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.table_types.finish()), + ], + ) + .unwrap() + } +} + +struct InformationSchemaViews { + schema: SchemaRef, + config: InformationSchemaConfig, } -impl From for MemTable { - fn from(value: InformationSchemaTablesBuilder) -> MemTable { - let schema = Schema::new(vec![ +impl InformationSchemaViews { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ Field::new("table_catalog", DataType::Utf8, false), Field::new("table_schema", DataType::Utf8, false), Field::new("table_name", DataType::Utf8, false), - Field::new("table_type", DataType::Utf8, false), - ]); - - let InformationSchemaTablesBuilder { - mut catalog_names, - mut schema_names, - mut table_names, - mut table_types, - } = value; - - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(catalog_names.finish()), - Arc::new(schema_names.finish()), - Arc::new(table_names.finish()), - Arc::new(table_types.finish()), - ], - ) - .unwrap(); + Field::new("definition", DataType::Utf8, true), + ])); + + Self { schema, config } + } - MemTable::try_new(schema, vec![vec![batch]]).unwrap() + fn builder(&self) -> InformationSchemaViewBuilder { + InformationSchemaViewBuilder { + catalog_names: StringBuilder::new(), + schema_names: StringBuilder::new(), + table_names: StringBuilder::new(), + definitions: StringBuilder::new(), + schema: self.schema.clone(), + } + } +} + +impl PartitionStream for InformationSchemaViews { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> SendableRecordBatchStream { + let mut builder = self.builder(); + let config = self.config.clone(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + // TODO: Stream this + futures::stream::once(async move { + config.make_views(&mut builder); + Ok(builder.finish()) + }), + )) } } @@ -343,6 +407,7 @@ impl From for MemTable { /// /// Columns are based on struct InformationSchemaViewBuilder { + schema: SchemaRef, catalog_names: StringBuilder, schema_names: StringBuilder, table_names: StringBuilder, @@ -350,15 +415,6 @@ struct InformationSchemaViewBuilder { } impl InformationSchemaViewBuilder { - fn new() -> Self { - Self { - catalog_names: StringBuilder::new(), - schema_names: StringBuilder::new(), - table_names: StringBuilder::new(), - definitions: StringBuilder::new(), - } - } - fn add_view( &mut self, catalog_name: impl AsRef, @@ -372,68 +428,56 @@ impl InformationSchemaViewBuilder { self.table_names.append_value(table_name.as_ref()); self.definitions.append_option(definition.as_ref()); } -} -impl From for MemTable { - fn from(value: InformationSchemaViewBuilder) -> Self { - let schema = Schema::new(vec![ - Field::new("table_catalog", DataType::Utf8, false), - Field::new("table_schema", DataType::Utf8, false), - Field::new("table_name", DataType::Utf8, false), - Field::new("definition", DataType::Utf8, true), - ]); - - let InformationSchemaViewBuilder { - mut catalog_names, - mut schema_names, - mut table_names, - mut definitions, - } = value; - - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema.clone(), + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), vec![ - Arc::new(catalog_names.finish()), - Arc::new(schema_names.finish()), - Arc::new(table_names.finish()), - Arc::new(definitions.finish()), + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.definitions.finish()), ], ) - .unwrap(); - - MemTable::try_new(schema, vec![vec![batch]]).unwrap() + .unwrap() } } -/// Builds the `information_schema.COLUMNS` table row by row -/// -/// Columns are based on -struct InformationSchemaColumnsBuilder { - catalog_names: StringBuilder, - schema_names: StringBuilder, - table_names: StringBuilder, - column_names: StringBuilder, - ordinal_positions: UInt64Builder, - column_defaults: StringBuilder, - is_nullables: StringBuilder, - data_types: StringBuilder, - character_maximum_lengths: UInt64Builder, - character_octet_lengths: UInt64Builder, - numeric_precisions: UInt64Builder, - numeric_precision_radixes: UInt64Builder, - numeric_scales: UInt64Builder, - datetime_precisions: UInt64Builder, - interval_types: StringBuilder, +struct InformationSchemaColumns { + schema: SchemaRef, + config: InformationSchemaConfig, } -impl InformationSchemaColumnsBuilder { - fn new() -> Self { +impl InformationSchemaColumns { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("table_catalog", DataType::Utf8, false), + Field::new("table_schema", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("ordinal_position", DataType::UInt64, false), + Field::new("column_default", DataType::Utf8, true), + Field::new("is_nullable", DataType::Utf8, false), + Field::new("data_type", DataType::Utf8, false), + Field::new("character_maximum_length", DataType::UInt64, true), + Field::new("character_octet_length", DataType::UInt64, true), + Field::new("numeric_precision", DataType::UInt64, true), + Field::new("numeric_precision_radix", DataType::UInt64, true), + Field::new("numeric_scale", DataType::UInt64, true), + Field::new("datetime_precision", DataType::UInt64, true), + Field::new("interval_type", DataType::Utf8, true), + ])); + + Self { schema, config } + } + + fn builder(&self) -> InformationSchemaColumnsBuilder { // StringBuilder requires providing an initial capacity, so // pick 10 here arbitrarily as this is not performance // critical code and the number of tables is unavailable here. let default_capacity = 10; - Self { + + InformationSchemaColumnsBuilder { catalog_names: StringBuilder::new(), schema_names: StringBuilder::new(), table_names: StringBuilder::new(), @@ -449,9 +493,53 @@ impl InformationSchemaColumnsBuilder { numeric_scales: UInt64Builder::with_capacity(default_capacity), datetime_precisions: UInt64Builder::with_capacity(default_capacity), interval_types: StringBuilder::new(), + schema: self.schema.clone(), } } +} + +impl PartitionStream for InformationSchemaColumns { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> SendableRecordBatchStream { + let mut builder = self.builder(); + let config = self.config.clone(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + // TODO: Stream this + futures::stream::once(async move { + config.make_columns(&mut builder); + Ok(builder.finish()) + }), + )) + } +} + +/// Builds the `information_schema.COLUMNS` table row by row +/// +/// Columns are based on +struct InformationSchemaColumnsBuilder { + schema: SchemaRef, + catalog_names: StringBuilder, + schema_names: StringBuilder, + table_names: StringBuilder, + column_names: StringBuilder, + ordinal_positions: UInt64Builder, + column_defaults: StringBuilder, + is_nullables: StringBuilder, + data_types: StringBuilder, + character_maximum_lengths: UInt64Builder, + character_octet_lengths: UInt64Builder, + numeric_precisions: UInt64Builder, + numeric_precision_radixes: UInt64Builder, + numeric_scales: UInt64Builder, + datetime_precisions: UInt64Builder, + interval_types: StringBuilder, +} +impl InformationSchemaColumnsBuilder { #[allow(clippy::too_many_arguments)] fn add_column( &mut self, @@ -547,111 +635,96 @@ impl InformationSchemaColumnsBuilder { self.datetime_precisions.append_option(None); self.interval_types.append_null(); } -} -impl From for MemTable { - fn from(value: InformationSchemaColumnsBuilder) -> MemTable { - let schema = Schema::new(vec![ - Field::new("table_catalog", DataType::Utf8, false), - Field::new("table_schema", DataType::Utf8, false), - Field::new("table_name", DataType::Utf8, false), - Field::new("column_name", DataType::Utf8, false), - Field::new("ordinal_position", DataType::UInt64, false), - Field::new("column_default", DataType::Utf8, true), - Field::new("is_nullable", DataType::Utf8, false), - Field::new("data_type", DataType::Utf8, false), - Field::new("character_maximum_length", DataType::UInt64, true), - Field::new("character_octet_length", DataType::UInt64, true), - Field::new("numeric_precision", DataType::UInt64, true), - Field::new("numeric_precision_radix", DataType::UInt64, true), - Field::new("numeric_scale", DataType::UInt64, true), - Field::new("datetime_precision", DataType::UInt64, true), - Field::new("interval_type", DataType::Utf8, true), - ]); - - let InformationSchemaColumnsBuilder { - mut catalog_names, - mut schema_names, - mut table_names, - mut column_names, - mut ordinal_positions, - mut column_defaults, - mut is_nullables, - mut data_types, - mut character_maximum_lengths, - mut character_octet_lengths, - mut numeric_precisions, - mut numeric_precision_radixes, - mut numeric_scales, - mut datetime_precisions, - mut interval_types, - } = value; - - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema.clone(), + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), vec![ - Arc::new(catalog_names.finish()), - Arc::new(schema_names.finish()), - Arc::new(table_names.finish()), - Arc::new(column_names.finish()), - Arc::new(ordinal_positions.finish()), - Arc::new(column_defaults.finish()), - Arc::new(is_nullables.finish()), - Arc::new(data_types.finish()), - Arc::new(character_maximum_lengths.finish()), - Arc::new(character_octet_lengths.finish()), - Arc::new(numeric_precisions.finish()), - Arc::new(numeric_precision_radixes.finish()), - Arc::new(numeric_scales.finish()), - Arc::new(datetime_precisions.finish()), - Arc::new(interval_types.finish()), + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.column_names.finish()), + Arc::new(self.ordinal_positions.finish()), + Arc::new(self.column_defaults.finish()), + Arc::new(self.is_nullables.finish()), + Arc::new(self.data_types.finish()), + Arc::new(self.character_maximum_lengths.finish()), + Arc::new(self.character_octet_lengths.finish()), + Arc::new(self.numeric_precisions.finish()), + Arc::new(self.numeric_precision_radixes.finish()), + Arc::new(self.numeric_scales.finish()), + Arc::new(self.datetime_precisions.finish()), + Arc::new(self.interval_types.finish()), ], ) - .unwrap(); - - MemTable::try_new(schema, vec![vec![batch]]).unwrap() + .unwrap() } } -struct InformationSchemaDfSettingsBuilder { - names: StringBuilder, - settings: StringBuilder, +struct InformationSchemaDfSettings { + schema: SchemaRef, + config: InformationSchemaConfig, } -impl InformationSchemaDfSettingsBuilder { - fn new() -> Self { - Self { +impl InformationSchemaDfSettings { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("setting", DataType::Utf8, false), + ])); + + Self { schema, config } + } + + fn builder(&self) -> InformationSchemaDfSettingsBuilder { + InformationSchemaDfSettingsBuilder { names: StringBuilder::new(), settings: StringBuilder::new(), + schema: self.schema.clone(), } } +} - fn add_setting(&mut self, name: impl AsRef, setting: impl AsRef) { - self.names.append_value(name.as_ref()); - self.settings.append_value(setting.as_ref()); +impl PartitionStream for InformationSchemaDfSettings { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> SendableRecordBatchStream { + let mut builder = self.builder(); + let config = self.config.clone(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + // TODO: Stream this + futures::stream::once(async move { + // create a mem table with the names of tables + config.make_df_settings(&mut builder); + Ok(builder.finish()) + }), + )) } } -impl From for MemTable { - fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable { - let schema = Schema::new(vec![ - Field::new("name", DataType::Utf8, false), - Field::new("setting", DataType::Utf8, false), - ]); +struct InformationSchemaDfSettingsBuilder { + schema: SchemaRef, + names: StringBuilder, + settings: StringBuilder, +} - let InformationSchemaDfSettingsBuilder { - mut names, - mut settings, - } = value; +impl InformationSchemaDfSettingsBuilder { + fn add_setting(&mut self, name: impl AsRef, setting: impl AsRef) { + self.names.append_value(name.as_ref()); + self.settings.append_value(setting.as_ref()); + } - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(names.finish()), Arc::new(settings.finish())], + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(self.names.finish()), + Arc::new(self.settings.finish()), + ], ) - .unwrap(); - - MemTable::try_new(schema, vec![vec![batch]]).unwrap() + .unwrap() } } diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index fc3e8f2d2913..8610607b660f 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -26,6 +26,7 @@ pub mod listing; pub mod listing_table_factory; pub mod memory; pub mod object_store; +pub mod streaming; pub mod view; use futures::Stream; diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs new file mode 100644 index 000000000000..88de34efad01 --- /dev/null +++ b/datafusion/core/src/datasource/streaming.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A simplified [`TableProvider`] for streaming partitioned datasets + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; + +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::{Expr, TableType}; + +use crate::datasource::TableProvider; +use crate::execution::context::SessionState; +use crate::physical_plan::streaming::StreamingTableExec; +use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; + +/// A partition that can be converted into a [`SendableRecordBatchStream`] +pub trait PartitionStream: Send + Sync { + /// Returns the schema of this partition + fn schema(&self) -> &SchemaRef; + + /// Returns a stream yielding this partitions values + fn execute(&self) -> SendableRecordBatchStream; +} + +/// A [`TableProvider`] that streams a set of [`PartitionStream`] +pub struct StreamingTable { + schema: SchemaRef, + partitions: Vec>, +} + +impl StreamingTable { + /// Try to create a new [`StreamingTable`] returning an error if the schema is incorrect + pub fn try_new( + schema: SchemaRef, + partitions: Vec>, + ) -> Result { + if !partitions.iter().all(|x| schema.contains(x.schema())) { + return Err(DataFusionError::Plan( + "Mismatch between schema and batches".to_string(), + )); + } + + Ok(Self { schema, partitions }) + } +} + +#[async_trait] +impl TableProvider for StreamingTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _ctx: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + // TODO: push limit down + Ok(Arc::new(StreamingTableExec::try_new( + self.schema.clone(), + self.partitions.clone(), + projection, + )?)) + } +} diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 2b4cd63c60b6..aa365ea45092 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -648,6 +648,7 @@ pub mod repartition; pub mod rewrite; pub mod sorts; pub mod stream; +pub mod streaming; pub mod udaf; pub mod union; pub mod values; diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs new file mode 100644 index 000000000000..a6ab51bb18ad --- /dev/null +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for streaming [`PartitionStream`] + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use futures::stream::StreamExt; + +use datafusion_common::{DataFusionError, Result, Statistics}; +use datafusion_physical_expr::PhysicalSortExpr; + +use crate::datasource::streaming::PartitionStream; +use crate::execution::context::TaskContext; +use crate::physical_plan::stream::RecordBatchStreamAdapter; +use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; + +/// An [`ExecutionPlan`] for [`PartitionStream`] +pub struct StreamingTableExec { + partitions: Vec>, + projection: Option>, + projected_schema: SchemaRef, +} + +impl StreamingTableExec { + /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect + pub fn try_new( + schema: SchemaRef, + partitions: Vec>, + projection: Option<&Vec>, + ) -> Result { + if !partitions.iter().all(|x| schema.contains(x.schema())) { + return Err(DataFusionError::Plan( + "Mismatch between schema and batches".to_string(), + )); + } + + let projected_schema = match projection { + Some(p) => Arc::new(schema.project(p)?), + None => schema, + }; + + Ok(Self { + partitions, + projected_schema, + projection: projection.cloned().map(Into::into), + }) + } +} + +impl std::fmt::Debug for StreamingTableExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazyMemTableExec").finish_non_exhaustive() + } +} + +#[async_trait] +impl ExecutionPlan for StreamingTableExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partitions.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Err(DataFusionError::Internal(format!( + "Children cannot be replaced in {:?}", + self + ))) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + let stream = self.partitions[partition].execute(); + Ok(match self.projection.clone() { + Some(projection) => Box::pin(RecordBatchStreamAdapter::new( + self.projected_schema.clone(), + stream.map(move |x| x.and_then(|b| b.project(projection.as_ref()))), + )), + None => stream, + }) + } + + fn statistics(&self) -> Statistics { + Default::default() + } +}