Skip to content

Commit

Permalink
Move physical_plan::file_format to datasource::plan (#6516)
Browse files Browse the repository at this point in the history
* Move `physical_plan::file_format` to `datasource::plan`

* fix doclinks
  • Loading branch information
alamb authored Jun 6, 2023
1 parent 39ee59a commit 786f222
Show file tree
Hide file tree
Showing 44 changed files with 83 additions and 86 deletions.
9 changes: 4 additions & 5 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use std::{sync::Arc, vec};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_type::FileCompressionType, listing::PartitionedFile,
file_format::file_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
},
error::Result,
physical_plan::{
file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};
use futures::StreamExt;
Expand Down
9 changes: 4 additions & 5 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_type::FileCompressionType, listing::PartitionedFile,
file_format::file_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::{
file_format::{FileScanConfig, FileStream, JsonOpener},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::StreamExt;
use object_store::ObjectStore;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
Expand All @@ -48,7 +49,6 @@ use crate::logical_expr::{
col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, TableType,
};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
use crate::datasource::file_format::FileFormat;
use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::physical_plan::{AvroExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::Statistics;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::NdJsonExec;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use std::task::{Context, Poll};
use std::{fmt, mem};

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_array::RecordBatch;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use crate::arrow::array::{
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;

use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};

/// The default file extension of parquet files
Expand Down Expand Up @@ -379,7 +379,7 @@ fn summarize_min_max(
/// This component is a subject to **change** in near future and is exposed for low level integrations
/// through [`ParquetFileReaderFactory`].
///
/// [`ParquetFileReaderFactory`]: crate::physical_plan::file_format::ParquetFileReaderFactory
/// [`ParquetFileReaderFactory`]: crate::datasource::physical_plan::ParquetFileReaderFactory
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
Expand Down Expand Up @@ -623,7 +623,7 @@ mod tests {
use super::*;

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::file_format::get_scan_files;
use crate::datasource::physical_plan::get_scan_files;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct PartitionedFile {
/// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
///
///
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
/// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict
/// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
/// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict
/// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use object_store::path::Path;
use object_store::ObjectMeta;

use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
Expand All @@ -44,7 +45,6 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
Expand Down Expand Up @@ -357,7 +357,7 @@ impl ListingOptions {
/// ```
///
/// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
/// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod file_format;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod streaming;
pub mod view;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

//! Execution plan for reading Arrow files
use crate::error::Result;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl ExecutionPlan for AvroExec {
#[cfg(feature = "avro")]
mod private {
use super::*;
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use crate::physical_plan::file_format::FileMeta;
use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
use crate::datasource::physical_plan::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
//! Execution plan for reading CSV files
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down Expand Up @@ -369,7 +369,7 @@ pub async fn plan_to_csv(
mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use std::task::{Context, Poll};
use std::time::Instant;

use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::error::Result;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::BatchSerializer;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::FileMeta;
use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! Execution plan for reading line-delimited JSON files
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down Expand Up @@ -308,8 +308,8 @@ mod tests {
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use crate::test::partitioned_file_groups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

//! Execution plan for reading Parquet files
use fmt::Debug;
use std::any::Any;
use std::cmp::min;
use std::fmt;
use std::fs;
use std::ops::Range;
use std::sync::Arc;

use crate::physical_plan::file_format::file_stream::{
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter,
};
use crate::{
config::ConfigOptions,
datasource::listing::FileRange,
Expand All @@ -37,13 +31,19 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
common::AbortOnDropSingle,
expressions::PhysicalSortExpr,
file_format::{FileMeta, FileScanConfig, SchemaAdapter},
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
},
};
use datafusion_physical_expr::PhysicalSortExpr;
use fmt::Debug;
use std::any::Any;
use std::cmp::min;
use std::fmt;
use std::fs;
use std::ops::Range;
use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use parquet::{
};
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};

use crate::physical_plan::file_format::parquet::{
use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::lit;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, Partitioning, Statistics};

Expand Down
Loading

0 comments on commit 786f222

Please sign in to comment.