Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into issue6561
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 11, 2023
2 parents e02f754 + d024f37 commit 0947cfd
Show file tree
Hide file tree
Showing 35 changed files with 1,211 additions and 1,125 deletions.
4 changes: 2 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ mod tests {

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(&sql).await?;
let plan = ctx.state().create_logical_plan(sql).await?;

match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => {
create_external_table(&ctx, cmd).await?;
}
_ => assert!(false),
_ => unreachable!(),
};

ctx.runtime_env()
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn get_s3_object_store_builder(
.ok_or_else(|| {
DataFusionError::ObjectStore(object_store::Error::Generic {
store: "S3",
source: format!("Failed to get S3 credentials from environment")
source: "Failed to get S3 credentials from environment".to_string()
.into(),
})
})?
Expand Down
110 changes: 110 additions & 0 deletions datafusion/common/src/display.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Types for plan display
use std::{
fmt::{self, Display, Formatter},
sync::Arc,
};

/// Represents which type of plan, when storing multiple
/// for use in EXPLAIN plans
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
InitialLogicalPlan,
/// The LogicalPlan which results from applying an analyzer pass
AnalyzedLogicalPlan {
/// The name of the analyzer which produced this plan
analyzer_name: String,
},
/// The LogicalPlan after all analyzer passes have been applied
FinalAnalyzedLogicalPlan,
/// The LogicalPlan which results from applying an optimizer pass
OptimizedLogicalPlan {
/// The name of the optimizer which produced this plan
optimizer_name: String,
},
/// The final, fully optimized LogicalPlan that was converted to a physical plan
FinalLogicalPlan,
/// The initial physical plan, prepared for execution
InitialPhysicalPlan,
/// The ExecutionPlan which results from applying an optimizer pass
OptimizedPhysicalPlan {
/// The name of the optimizer which produced this plan
optimizer_name: String,
},
/// The final, fully optimized physical which would be executed
FinalPhysicalPlan,
}

impl Display for PlanType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
PlanType::AnalyzedLogicalPlan { analyzer_name } => {
write!(f, "logical_plan after {analyzer_name}")
}
PlanType::FinalAnalyzedLogicalPlan => write!(f, "analyzed_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {optimizer_name}")
}
PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
PlanType::OptimizedPhysicalPlan { optimizer_name } => {
write!(f, "physical_plan after {optimizer_name}")
}
PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
}
}
}

/// Represents some sort of execution plan, in String form
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StringifiedPlan {
/// An identifier of what type of plan this string represents
pub plan_type: PlanType,
/// The string representation of the plan
pub plan: Arc<String>,
}

impl StringifiedPlan {
/// Create a new Stringified plan of `plan_type` with string
/// representation `plan`
pub fn new(plan_type: PlanType, plan: impl Into<String>) -> Self {
StringifiedPlan {
plan_type,
plan: Arc::new(plan.into()),
}
}

/// Returns true if this plan should be displayed. Generally
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
_ => verbose_mode,
}
}
}

/// Trait for something that can be formatted as a stringified plan
pub trait ToStringifiedPlan {
/// Create a stringified plan with the specified type
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan;
}
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod column;
pub mod config;
pub mod delta;
mod dfschema;
pub mod display;
mod error;
mod join_type;
pub mod parsers;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
#[async_trait]
impl ReadOptions<'_> for AvroReadOptions<'_> {
fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
let file_format = AvroFormat::default();
let file_format = AvroFormat;

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand All @@ -535,7 +535,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
#[async_trait]
impl ReadOptions<'_> for ArrowReadOptions<'_> {
fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
let file_format = ArrowFormat::default();
let file_format = ArrowFormat;

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ impl ListingTableConfig {
.map_err(|_| DataFusionError::Internal(err_msg))?;

let file_format: Arc<dyn FileFormat> = match file_type {
FileType::ARROW => Arc::new(ArrowFormat::default()),
FileType::AVRO => Arc::new(AvroFormat::default()),
FileType::ARROW => Arc::new(ArrowFormat),
FileType::AVRO => Arc::new(AvroFormat),
FileType::CSV => Arc::new(
CsvFormat::default().with_file_compression_type(file_compression_type),
),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_compression_type(file_compression_type),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::AVRO => Arc::new(AvroFormat::default()),
FileType::AVRO => Arc::new(AvroFormat),
FileType::JSON => Arc::new(
JsonFormat::default().with_file_compression_type(file_compression_type),
),
FileType::ARROW => Arc::new(ArrowFormat::default()),
FileType::ARROW => Arc::new(ArrowFormat),
};

let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
Expand Down
36 changes: 20 additions & 16 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
use futures::StreamExt;
use log::debug;
use std::any::Any;
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
Expand Down Expand Up @@ -55,23 +56,26 @@ pub struct MemTable {
impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
if partitions
.iter()
.flatten()
.all(|batches| schema.contains(&batches.schema()))
{
Ok(Self {
schema,
batches: partitions
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
})
} else {
Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
))
for batches in partitions.iter().flatten() {
let batches_schema = batches.schema();
if !schema.contains(&batches_schema) {
debug!(
"mem table schema does not contain batches schema. \
Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

Ok(Self {
schema,
batches: partitions
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
})
}

/// Create a mem table by reading from another data source
Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;

use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, TableType};
use log::debug;

use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
Expand Down Expand Up @@ -53,10 +54,17 @@ impl StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
) -> Result<Self> {
if !partitions.iter().all(|x| schema.contains(x.schema())) {
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.contains(partition_schema) {
debug!(
"target schema does not contain partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

Ok(Self {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn create_output_batch(
plan_builder.append_value(total_rows.to_string());

type_builder.append_value("Duration");
plan_builder.append_value(format!("{:?}", duration));
plan_builder.append_value(format!("{duration:?}"));
}

RecordBatch::try_new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::fmt;

use crate::logical_expr::{StringifiedPlan, ToStringifiedPlan};
use datafusion_common::display::{StringifiedPlan, ToStringifiedPlan};

use super::{accept, ExecutionPlan, ExecutionPlanVisitor};

Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
use std::any::Any;
use std::sync::Arc;

use datafusion_common::display::StringifiedPlan;

use datafusion_common::{DataFusionError, Result};

use crate::{
logical_expr::StringifiedPlan,
physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics},
};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use log::trace;

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ use crate::logical_expr::{
};
use crate::logical_expr::{
CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType,
Repartition, ToStringifiedPlan, Union, UserDefinedLogicalNode,
Repartition, Union, UserDefinedLogicalNode,
};
use datafusion_common::display::ToStringifiedPlan;

use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/physical_plan/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::stream::StreamExt;

use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;

use crate::datasource::streaming::PartitionStream;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -48,10 +49,17 @@ impl StreamingTableExec {
projection: Option<&Vec<usize>>,
infinite: bool,
) -> Result<Self> {
if !partitions.iter().all(|x| schema.contains(x.schema())) {
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.contains(partition_schema) {
debug!(
"target schema does not contain partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

let projected_schema = match projection {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,6 @@ async fn timestamp_sub_interval_days() -> Result<()> {
}

#[tokio::test]
#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327
async fn timestamp_add_interval_months() -> Result<()> {
let ctx = SessionContext::new();

Expand All @@ -576,7 +575,7 @@ async fn timestamp_add_interval_months() -> Result<()> {
let res1 = actual[0][0].as_str();
let res2 = actual[0][1].as_str();

let format = "%Y-%m-%d %H:%M:%S%.6f";
let format = "%Y-%m-%dT%H:%M:%S%.6fZ";
let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();

Expand Down
Loading

0 comments on commit 0947cfd

Please sign in to comment.