diff --git a/Cargo.toml b/Cargo.toml index ee277d67a585..2b854c670349 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ version = "44.0.0" # selectively turn them on if needed, since we can override default-features = true (from false) # for the inherited dependency but cannot do the reverse (override from true to false). # -# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 +# See for more details: https://github.com/rust-lang/cargo/issues/11329 ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } diff --git a/README.md b/README.md index c2ede4833e9b..e0fc6854ecff 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ Default features: - `regex_expressions`: regular expression functions, such as `regexp_match` - `unicode_expressions`: Include unicode aware functions such as `character_length` - `unparser`: enables support to reverse LogicalPlans back into SQL -- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection. +- `recursive_protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection. Optional features: diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 58d27bdce8ad..863bb5181f45 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1543,6 +1543,7 @@ dependencies = [ "indexmap", "itertools", "log", + "recursive", "regex", "regex-syntax", ] diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index f244218b2a5c..054a58b7bc41 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -45,6 +45,7 @@ datafusion = { path = "../datafusion/core", version = "44.0.0", features = [ "datetime_expressions", "encoding_expressions", "parquet", + "recursive_protection", "regex_expressions", "unicode_expressions", "compression", diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index d7ca48d638b7..36e68ec4842b 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -360,7 +360,7 @@ impl TableFunctionImpl for ParquetMetadataFunc { Field::new("total_uncompressed_size", DataType::Int64, true), ])); - // construct recordbatch from metadata + // construct record batch from metadata let mut filename_arr = vec![]; let mut row_group_id_arr = vec![]; let mut row_group_num_rows_arr = vec![]; diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index a155920eadc4..b06148ce267f 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -22,7 +22,7 @@ This crate includes end to end, highly commented examples of how to use various DataFusion APIs to help you get started. -## Prerequisites: +## Prerequisites Run `git submodule update --init` to init test files. diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 67b745d4074e..28a3a2f1de09 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -82,7 +82,7 @@ use url::Url; /// Specifically, this example illustrates how to: /// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query /// 2. Use [`PruningPredicate`] for predicate analysis -/// 3. Pass a row group selection to [`ParuetExec`] +/// 3. Pass a row group selection to [`ParquetExec`] /// 4. Pass a row selection (within a row group) to [`ParquetExec`] /// /// Note this is a *VERY* low level example for people who want to build their @@ -211,7 +211,7 @@ async fn main() -> Result<()> { // // Note: in order to prune pages, the Page Index must be loaded and the // ParquetExec will load it on demand if not present. To avoid a second IO - // during query, this example loaded the Page Index pre-emptively by setting + // during query, this example loaded the Page Index preemptively by setting // `ArrowReader::with_page_index` in `IndexedFile::try_new` provider.set_use_row_selection(true); println!("** Select data, predicate `id = 950`"); diff --git a/datafusion-examples/examples/analyzer_rule.rs b/datafusion-examples/examples/analyzer_rule.rs index bd067be97b8b..aded64ed4105 100644 --- a/datafusion-examples/examples/analyzer_rule.rs +++ b/datafusion-examples/examples/analyzer_rule.rs @@ -138,7 +138,7 @@ impl AnalyzerRule for RowLevelAccessControl { fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { // use the TreeNode API to recursively walk the LogicalPlan tree // and all of its children (inputs) - let transfomed_plan = plan.transform(|plan| { + let transformed_plan = plan.transform(|plan| { // This closure is called for each LogicalPlan node // if it is a Scan node, add a filter to remove all managers if is_employee_table_scan(&plan) { @@ -166,7 +166,7 @@ impl AnalyzerRule for RowLevelAccessControl { // // This example does not need the value of either flag, so simply // extract the LogicalPlan "data" - Ok(transfomed_plan.data) + Ok(transformed_plan.data) } fn name(&self) -> &str { diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index f40f1dfb5a15..655438b78b9f 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -46,11 +46,11 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); let state = ctx.state(); - let cataloglist = Arc::new(CustomCatalogProviderList::new()); + let catalog_list = Arc::new(CustomCatalogProviderList::new()); // use our custom catalog list for context. each context has a single catalog list. // context will by default have [`MemoryCatalogProviderList`] - ctx.register_catalog_list(cataloglist.clone()); + ctx.register_catalog_list(catalog_list.clone()); // initialize our catalog and schemas let catalog = DirCatalog::new(); @@ -81,7 +81,7 @@ async fn main() -> Result<()> { ctx.register_catalog("dircat", Arc::new(catalog)); { // catalog was passed down into our custom catalog list since we override the ctx's default - let catalogs = cataloglist.catalogs.read().unwrap(); + let catalogs = catalog_list.catalogs.read().unwrap(); assert!(catalogs.contains_key("dircat")); }; @@ -144,8 +144,8 @@ impl DirSchema { async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result> { let DirSchemaOpts { ext, dir, format } = opts; let mut tables = HashMap::new(); - let direntries = std::fs::read_dir(dir).unwrap(); - for res in direntries { + let dir_entries = std::fs::read_dir(dir).unwrap(); + for res in dir_entries { let entry = res.unwrap(); let filename = entry.file_name().to_str().unwrap().to_string(); if !filename.ends_with(ext) { diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index cb0796bdcf73..943e5d5e027c 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -53,7 +53,7 @@ use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter; /// 4. Simplify expressions: [`simplify_demo`] /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] /// 6. Get the types of the expressions: [`expression_type_demo`] -/// 7. Apply type cocercion to expressions: [`type_coercion_demo`] +/// 7. Apply type coercion to expressions: [`type_coercion_demo`] #[tokio::main] async fn main() -> Result<()> { // The easiest way to do create expressions is to use the @@ -392,7 +392,7 @@ fn type_coercion_demo() -> Result<()> { )?; assert!(physical_expr.evaluate(&batch).is_ok()); - // 4. Apply explict type coercion by manually rewriting the expression + // 4. Apply explicit type coercion by manually rewriting the expression let coerced_expr = expr .transform(|e| { // Only type coerces binary expressions. diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/function_factory.rs index b2771149aae5..58ffa060ebaa 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/function_factory.rs @@ -36,7 +36,7 @@ use datafusion_expr::{ /// /// Apart from [FunctionFactory], this example covers /// [ScalarUDFImpl::simplify()] which is often used at the same time, to replace -/// a function call with another expression at rutime. +/// a function call with another expression at runtime. /// /// This example is rather simple and does not cover all cases required for a /// real implementation. diff --git a/datafusion-examples/examples/memtable.rs b/datafusion-examples/examples/memtable.rs index 5cce578039e7..bb0b720eff79 100644 --- a/datafusion-examples/examples/memtable.rs +++ b/datafusion-examples/examples/memtable.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::time::timeout; -/// This example demonstrates executing a simple query against a Memtable +/// This example demonstrates executing a simple query against a [`MemTable`] #[tokio::main] async fn main() -> Result<()> { let mem_table = create_memtable()?; diff --git a/datafusion-examples/examples/optimizer_rule.rs b/datafusion-examples/examples/optimizer_rule.rs index 0f28a1670252..e8a272f28318 100644 --- a/datafusion-examples/examples/optimizer_rule.rs +++ b/datafusion-examples/examples/optimizer_rule.rs @@ -146,7 +146,7 @@ impl MyOptimizerRule { // Closure called for each sub tree match expr { Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => { - // destruture the expression + // destructure the expression let BinaryExpr { left, op: _, right } = binary_expr; // rewrite to `my_eq(left, right)` let udf = ScalarUDF::new_from_impl(MyEq::new()); diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/plan_to_sql.rs index 8ea7c2951223..b5b69093a646 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/plan_to_sql.rs @@ -65,7 +65,7 @@ fn simple_expr_to_sql_demo() -> Result<()> { Ok(()) } -/// DataFusioon can remove parentheses when converting an expression to SQL. +/// DataFusion can remove parentheses when converting an expression to SQL. /// Note that output is intended for humans, not for other SQL engines, /// as difference in precedence rules can cause expressions to be parsed differently. fn simple_expr_to_pretty_sql_demo() -> Result<()> { diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index f32560ede69d..7cf1ce87690e 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -140,7 +140,7 @@ impl TableFunctionImpl for LocalCsvTableFunc { let limit = exprs .get(1) .map(|expr| { - // try to simpify the expression, so 1+2 becomes 3, for example + // try to simplify the expression, so 1+2 becomes 3, for example let execution_props = ExecutionProps::new(); let info = SimplifyContext::new(&execution_props); let expr = ExprSimplifier::new(info).simplify(expr.clone())?; @@ -173,8 +173,8 @@ fn read_csv_batches(csv_path: impl AsRef) -> Result<(SchemaRef, Vec>(); for using_col in using_columns { let all_matched = columns.iter().all(|c| using_col.contains(c)); - // All matched fields belong to the same using column set, in orther words + // All matched fields belong to the same using column set, in other words // the same join clause. We simply pick the qualifier from the first match. if all_matched { return Ok(columns[0].clone()); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6e64700bd2e0..942aa308e200 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -904,12 +904,12 @@ pub trait ConfigExtension: ExtensionOptions { pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static { /// Return `self` as [`Any`] /// - /// This is needed until trait upcasting is stabilised + /// This is needed until trait upcasting is stabilized fn as_any(&self) -> &dyn Any; /// Return `self` as [`Any`] /// - /// This is needed until trait upcasting is stabilised + /// This is needed until trait upcasting is stabilized fn as_any_mut(&mut self) -> &mut dyn Any; /// Return a deep clone of this [`ExtensionOptions`] diff --git a/datafusion/common/src/cse.rs b/datafusion/common/src/cse.rs index f64571b8471e..674d3386171f 100644 --- a/datafusion/common/src/cse.rs +++ b/datafusion/common/src/cse.rs @@ -60,7 +60,7 @@ pub trait Normalizeable { } /// The `NormalizeEq` trait extends `Eq` and `Normalizeable` to provide a method for comparing -/// normlized nodes in optimizations like Common Subexpression Elimination (CSE). +/// normalized nodes in optimizations like Common Subexpression Elimination (CSE). /// /// The `normalize_eq` method ensures that two nodes that are semantically equivalent (after normalization) /// are considered equal in CSE optimization, even if their original forms differ. diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b5f7b5681eef..ac4d8be8045f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -955,7 +955,7 @@ pub trait ExprSchema: std::fmt::Debug { /// Returns the column's optional metadata. fn metadata(&self, col: &Column) -> Result<&HashMap>; - /// Return the coulmn's datatype and nullability + /// Return the column's datatype and nullability fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>; } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 4fac7298c455..1012c4cd2270 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -115,7 +115,7 @@ pub enum DataFusionError { Execution(String), /// [`JoinError`] during execution of the query. /// - /// This error can unoccur for unjoined tasks, such as execution shutdown. + /// This error can't occur for unjoined tasks, such as execution shutdown. ExecutionJoin(JoinError), /// Error when resources (such as memory of scratch disk space) are exhausted. /// diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index feb3c6f604f0..de14d3a01037 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2216,7 +2216,7 @@ impl ScalarValue { /// /// Errors if `self` is /// - a decimal that fails be converted to a decimal array of size - /// - a `Fixedsizelist` that fails to be concatenated into an array of size + /// - a `FixedsizeList` that fails to be concatenated into an array of size /// - a `List` that fails to be concatenated into an array of size /// - a `Dictionary` that fails be converted to a dictionary array of size pub fn to_array_of_size(&self, size: usize) -> Result { @@ -2925,7 +2925,7 @@ impl ScalarValue { /// preferred over this function if at all possible as they can be /// vectorized and are generally much faster. /// - /// This function has a few narrow usescases such as hash table key + /// This function has a few narrow use cases such as hash table key /// comparisons where comparing a single row at a time is necessary. /// /// # Errors @@ -4465,7 +4465,7 @@ mod tests { Ok(()) } - // Verifies that ScalarValue has the same behavior with compute kernal when it overflows. + // Verifies that ScalarValue has the same behavior with compute kernel when it overflows. fn check_scalar_add_overflow(left: ScalarValue, right: ScalarValue) where T: ArrowNumericType, @@ -6150,9 +6150,9 @@ mod tests { &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); - let newscalar = ScalarValue::try_from_array(&array, 0).unwrap(); + let new_scalar = ScalarValue::try_from_array(&array, 0).unwrap(); assert_eq!( - newscalar.data_type(), + new_scalar.data_type(), DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index d92a2cc34b56..c70389b63177 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -124,7 +124,7 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( &'n self, visitor: &mut V, @@ -174,7 +174,7 @@ pub trait TreeNode: Sized { /// TreeNodeRewriter::f_up(ChildNode2) /// TreeNodeRewriter::f_up(ParentNode) /// ``` - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn rewrite>( self, rewriter: &mut R, @@ -197,7 +197,7 @@ pub trait TreeNode: Sized { &'n self, mut f: F, ) -> Result { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( node: &'n N, f: &mut F, @@ -232,7 +232,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_down_impl Result>>( node: N, f: &mut F, @@ -256,7 +256,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_up_impl Result>>( node: N, f: &mut F, @@ -371,7 +371,7 @@ pub trait TreeNode: Sized { mut f_down: FD, mut f_up: FU, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_down_up_impl< N: TreeNode, FD: FnMut(N) -> Result>, @@ -995,11 +995,11 @@ impl< /// construct a temporary container to be able to call `apply_ref_elements` on a /// collection of tree node references. But in that case the container's temporary /// lifetime is different to the lifetime of tree nodes that we put into it. -/// Please find an example usecase in `Expr::apply_children` with the `Expr::Case` case. +/// Please find an example use case in `Expr::apply_children` with the `Expr::Case` case. /// /// Most of the cases we don't need to create a temporary container with /// `TreeNodeRefContainer`, but we can just call `TreeNodeContainer::apply_elements`. -/// Please find an example usecase in `Expr::apply_children` with the `Expr::GroupingSet` +/// Please find an example use case in `Expr::apply_children` with the `Expr::GroupingSet` /// case. pub trait TreeNodeRefContainer<'a, T: 'a>: Sized { /// Applies `f` to all elements of the container. @@ -2349,7 +2349,7 @@ pub(crate) mod tests { Ok(()) } - #[cfg(feature = "recursive-protection")] + #[cfg(feature = "recursive_protection")] #[test] fn test_large_tree() { let mut item = TestTreeNode::new_leaf("initial".to_string()); diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index bb68d59eed59..ab73996fcd8b 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! This module provides a function to estimate the memory size of a HashTable prior to alloaction +//! This module provides a function to estimate the memory size of a HashTable prior to allocation use crate::{DataFusionError, Result}; use std::mem::size_of; @@ -79,7 +79,7 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result // For the majority of cases hashbrown overestimates the bucket quantity // to keep ~1/8 of them empty. We take this factor into account by // multiplying the number of elements with a fixed ratio of 8/7 (~1.14). - // This formula leads to overallocation for small tables (< 8 elements) + // This formula leads to over-allocation for small tables (< 8 elements) // but should be fine overall. num_elements .checked_mul(8) diff --git a/datafusion/common/src/utils/proxy.rs b/datafusion/common/src/utils/proxy.rs index b32164f682fa..d940677a5fb3 100644 --- a/datafusion/common/src/utils/proxy.rs +++ b/datafusion/common/src/utils/proxy.rs @@ -92,12 +92,12 @@ impl VecAllocExt for Vec { type T = T; fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) { - let prev_capacty = self.capacity(); + let prev_capacity = self.capacity(); self.push(x); let new_capacity = self.capacity(); - if new_capacity > prev_capacty { + if new_capacity > prev_capacity { // capacity changed, so we allocated more - let bump_size = (new_capacity - prev_capacty) * size_of::(); + let bump_size = (new_capacity - prev_capacity) * size_of::(); // Note multiplication should never overflow because `push` would // have panic'd first, but the checked_add could potentially // overflow since accounting could be tracking additional values, and diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index dca40ab3d67a..64ad8f2ba152 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -59,6 +59,7 @@ default = [ "unicode_expressions", "compression", "parquet", + "recursive_protection", ] encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) @@ -69,6 +70,13 @@ pyarrow = ["datafusion-common/pyarrow", "parquet"] regex_expressions = [ "datafusion-functions/regex_expressions", ] +recursive_protection = [ + "datafusion-common/recursive_protection", + "datafusion-expr/recursive_protection", + "datafusion-optimizer/recursive_protection", + "datafusion-physical-optimizer/recursive_protection", + "datafusion-sql/recursive_protection", +] serde = ["arrow-schema/serde"] string_expressions = ["datafusion-functions/string_expressions"] unicode_expressions = [ diff --git a/datafusion/core/benches/physical_plan.rs b/datafusion/core/benches/physical_plan.rs index 349c2e438195..7d87a37b3b9c 100644 --- a/datafusion/core/benches/physical_plan.rs +++ b/datafusion/core/benches/physical_plan.rs @@ -38,7 +38,7 @@ use datafusion::physical_plan::{ use datafusion::prelude::SessionContext; use datafusion_physical_expr_common::sort_expr::LexOrdering; -// Initialise the operator using the provided record batches and the sort key +// Initialize the operator using the provided record batches and the sort key // as inputs. All record batches must have the same schema. fn sort_preserving_merge_operator( session_ctx: Arc, diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 414d6da7bc9b..60a09301ae0f 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -77,6 +77,9 @@ pub struct DataFrameWriteOptions { /// Sets which columns should be used for hive-style partitioned writes by name. /// Can be set to empty vec![] for non-partitioned writes. partition_by: Vec, + /// Sets which columns should be used for sorting the output by name. + /// Can be set to empty vec![] for non-sorted writes. + sort_by: Vec, } impl DataFrameWriteOptions { @@ -86,6 +89,7 @@ impl DataFrameWriteOptions { insert_op: InsertOp::Append, single_file_output: false, partition_by: vec![], + sort_by: vec![], } } @@ -106,6 +110,12 @@ impl DataFrameWriteOptions { self.partition_by = partition_by; self } + + /// Sets the sort_by columns for output sorting + pub fn with_sort_by(mut self, sort_by: Vec) -> Self { + self.sort_by = sort_by; + self + } } impl Default for DataFrameWriteOptions { @@ -1517,8 +1527,17 @@ impl DataFrame { write_options: DataFrameWriteOptions, ) -> Result, DataFusionError> { let arrow_schema = Schema::from(self.schema()); + + let plan = if write_options.sort_by.is_empty() { + self.plan + } else { + LogicalPlanBuilder::from(self.plan) + .sort(write_options.sort_by)? + .build()? + }; + let plan = LogicalPlanBuilder::insert_into( - self.plan, + plan, table_name.to_owned(), &arrow_schema, write_options.insert_op, @@ -1577,8 +1596,16 @@ impl DataFrame { let file_type = format_as_file_type(format); + let plan = if options.sort_by.is_empty() { + self.plan + } else { + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? + }; + let plan = LogicalPlanBuilder::copy_to( - self.plan, + plan, path.into(), file_type, HashMap::new(), @@ -1638,8 +1665,16 @@ impl DataFrame { let file_type = format_as_file_type(format); + let plan = if options.sort_by.is_empty() { + self.plan + } else { + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? + }; + let plan = LogicalPlanBuilder::copy_to( - self.plan, + plan, path.into(), file_type, Default::default(), @@ -1940,6 +1975,7 @@ mod tests { use crate::physical_plan::{ColumnarValue, Partitioning, PhysicalExpr}; use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name}; + use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions}; use arrow::array::Int32Array; use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue}; use datafusion_common_runtime::SpawnedTask; @@ -1954,6 +1990,7 @@ mod tests { use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use sqlparser::ast::NullTreatment; + use tempfile::TempDir; // Get string representation of the plan async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) { @@ -3279,7 +3316,7 @@ mod tests { &df_results ); - // check that col with the same name ovwewritten + // check that col with the same name overwritten let df_results_overwrite = df .clone() .with_column("c1", col("c2") + col("c3"))? @@ -3302,7 +3339,7 @@ mod tests { &df_results_overwrite ); - // check that col with the same name ovwewritten using same name as reference + // check that col with the same name overwritten using same name as reference let df_results_overwrite_self = df .clone() .with_column("c2", col("c2") + lit(1))? @@ -4057,4 +4094,237 @@ mod tests { Ok(()) } + + // Test issue: https://github.com/apache/datafusion/issues/13873 + #[tokio::test] + async fn write_parquet_with_order() -> Result<()> { + let tmp_dir = TempDir::new()?; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let ctx = SessionContext::new(); + let write_df = ctx.read_batch(RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), + ], + )?)?; + + let test_path = tmp_dir.path().join("test.parquet"); + + write_df + .clone() + .write_parquet( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new() + .with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; + + let ctx = SessionContext::new(); + ctx.register_parquet( + "data", + test_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + let df = ctx.sql("SELECT * FROM data").await?; + let results = df.collect().await?; + + let df_explain = ctx.sql("explain SELECT a FROM data").await?; + let explain_result = df_explain.collect().await?; + + println!("explain_result {:?}", explain_result); + + assert_batches_eq!( + &[ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 2 |", + "| 2 | 6 |", + "| 3 | 5 |", + "| 5 | 3 |", + "| 7 | 4 |", + "+---+---+", + ], + &results + ); + Ok(()) + } + + // Test issue: https://github.com/apache/datafusion/issues/13873 + #[tokio::test] + async fn write_csv_with_order() -> Result<()> { + let tmp_dir = TempDir::new()?; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let ctx = SessionContext::new(); + let write_df = ctx.read_batch(RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), + ], + )?)?; + + let test_path = tmp_dir.path().join("test.csv"); + + write_df + .clone() + .write_csv( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new() + .with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; + + let ctx = SessionContext::new(); + ctx.register_csv( + "data", + test_path.to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + let df = ctx.sql("SELECT * FROM data").await?; + let results = df.collect().await?; + + assert_batches_eq!( + &[ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 2 |", + "| 2 | 6 |", + "| 3 | 5 |", + "| 5 | 3 |", + "| 7 | 4 |", + "+---+---+", + ], + &results + ); + Ok(()) + } + + // Test issue: https://github.com/apache/datafusion/issues/13873 + #[tokio::test] + async fn write_json_with_order() -> Result<()> { + let tmp_dir = TempDir::new()?; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let ctx = SessionContext::new(); + let write_df = ctx.read_batch(RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), + ], + )?)?; + + let test_path = tmp_dir.path().join("test.json"); + + write_df + .clone() + .write_json( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new() + .with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; + + let ctx = SessionContext::new(); + ctx.register_json( + "data", + test_path.to_str().unwrap(), + NdJsonReadOptions::default().schema(&schema), + ) + .await?; + + let df = ctx.sql("SELECT * FROM data").await?; + let results = df.collect().await?; + + assert_batches_eq!( + &[ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 2 |", + "| 2 | 6 |", + "| 3 | 5 |", + "| 5 | 3 |", + "| 7 | 4 |", + "+---+---+", + ], + &results + ); + Ok(()) + } + + // Test issue: https://github.com/apache/datafusion/issues/13873 + #[tokio::test] + async fn write_table_with_order() -> Result<()> { + let tmp_dir = TempDir::new()?; + let ctx = SessionContext::new(); + let location = tmp_dir.path().join("test_table/"); + + let mut write_df = ctx + .sql("values ('z'), ('x'), ('a'), ('b'), ('c')") + .await + .unwrap(); + + // Ensure the column names and types match the target table + write_df = write_df + .with_column_renamed("column1", "tablecol1") + .unwrap(); + let sql_str = + "create external table data(tablecol1 varchar) stored as parquet location '" + .to_owned() + + location.to_str().unwrap() + + "'"; + + ctx.sql(sql_str.as_str()).await?.collect().await?; + + // This is equivalent to INSERT INTO test. + write_df + .clone() + .write_table( + "data", + DataFrameWriteOptions::new() + .with_sort_by(vec![col("tablecol1").sort(true, true)]), + ) + .await?; + + let df = ctx.sql("SELECT * FROM data").await?; + let results = df.collect().await?; + + assert_batches_eq!( + &[ + "+-----------+", + "| tablecol1 |", + "+-----------+", + "| a |", + "| b |", + "| c |", + "| x |", + "| z |", + "+-----------+", + ], + &results + ); + Ok(()) + } } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 0af68783c41f..1dd4d68fca6b 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -74,8 +74,16 @@ impl DataFrame { let file_type = format_as_file_type(format); + let plan = if options.sort_by.is_empty() { + self.plan + } else { + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? + }; + let plan = LogicalPlanBuilder::copy_to( - self.plan, + plan, path.into(), file_type, Default::default(), diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 5efabd000d68..91c1e0ac97fc 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -67,7 +67,7 @@ impl TableSource for DefaultTableSource { } /// Tests whether the table provider can make use of any or all filter expressions - /// to optimise data retrieval. + /// to optimize data retrieval. fn supports_filters_pushdown( &self, filter: &[&Expr], diff --git a/datafusion/core/src/datasource/physical_plan/file_groups.rs b/datafusion/core/src/datasource/physical_plan/file_groups.rs index f9a19f1d9691..f681dfe219b5 100644 --- a/datafusion/core/src/datasource/physical_plan/file_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/file_groups.rs @@ -781,7 +781,7 @@ mod test { assert_partitioned_files(expected, actual); } - /// Asserts that the two groups of `ParititonedFile` are the same + /// Asserts that the two groups of [`PartitionedFile`] are the same /// (PartitionedFile doesn't implement PartialEq) fn assert_partitioned_files( expected: Option>>, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 879c9817a382..4071f9c26b58 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -870,7 +870,7 @@ mod tests { )] #[cfg(feature = "compression")] #[tokio::test] - async fn test_json_with_repartitioing( + async fn test_json_with_repartitioning( file_compression_type: FileCompressionType, ) -> Result<()> { let config = SessionConfig::new() diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 7573e32f8652..83b544a76e11 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -333,7 +333,7 @@ impl ParquetExecBuilder { /// Set the filter predicate when reading. /// - /// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation + /// See the "Predicate Pushdown" section of the [`ParquetExec`] documentation /// for more details. pub fn with_predicate(mut self, predicate: Arc) -> Self { self.predicate = Some(predicate); @@ -611,7 +611,7 @@ impl ParquetExec { } /// If enabled, the reader will read the page index - /// This is used to optimise filter pushdown + /// This is used to optimize filter pushdown /// via `RowSelector` and `RowFilter` by /// eliminating unnecessary IO and decoding pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index af5ffb9d5743..f6428a693fb1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -422,7 +422,7 @@ fn would_column_prevent_pushdown( checker.prevents_pushdown() } -/// Recurses through expr as a trea, finds all `column`s, and checks if any of them would prevent +/// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent /// this expression from being predicate pushed down. If any of them would, this returns false. /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( @@ -692,7 +692,7 @@ mod test { let mut parquet_reader = parquet_reader_builder.build().expect("building reader"); - // Parquet file is small, we only need 1 recordbatch + // Parquet file is small, we only need 1 record batch let first_rb = parquet_reader .next() .expect("expected record batch") diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 810f74e8515b..3854f04566ee 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -294,7 +294,7 @@ impl BloomFilterStatistics { } _ => true, }, - // One more parrern matching since not all data types are supported + // One more pattern matching since not all data types are supported // inside of a Dictionary ScalarValue::Dictionary(_, inner) => match inner.as_ref() { ScalarValue::Int32(_) diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 2cea37fe17e2..768761bb9cf1 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -101,7 +101,7 @@ impl FromStr for StreamEncoding { match s.to_ascii_lowercase().as_str() { "csv" => Ok(Self::Csv), "json" => Ok(Self::Json), - _ => plan_err!("Unrecognised StreamEncoding {}", s), + _ => plan_err!("Unrecognized StreamEncoding {}", s), } } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 67236c9a6bd2..3455cce132b6 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1903,7 +1903,7 @@ mod tests { #[tokio::test] async fn send_context_to_threads() -> Result<()> { // ensure SessionContexts can be used in a multi-threaded - // environment. Usecase is for concurrent planing. + // environment. Use case is for concurrent planing. let tmp_dir = TempDir::new()?; let partition_count = 4; let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?); diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 3f23c150be83..be87c7cac1d2 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -281,10 +281,10 @@ mod tests { ) .await; let binding = DataFilePaths::to_urls(&path2).unwrap(); - let expexted_path = binding[0].as_str(); + let expected_path = binding[0].as_str(); assert_eq!( read_df.unwrap_err().strip_backtrace(), - format!("Execution error: File path '{}' does not match the expected extension '.parquet'", expexted_path) + format!("Execution error: File path '{}' does not match the expected extension '.parquet'", expected_path) ); // Read the dataframe from 'output3.parquet.snappy.parquet' with the correct file extension. @@ -316,7 +316,7 @@ mod tests { let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); assert_eq!(total_rows, 0); - // Read the datafram from doule dot folder; + // Read the dataframe from double dot folder; let read_df = ctx .read_parquet( &path5, diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ef32e84a7380..c5874deb6ed5 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -125,9 +125,9 @@ pub struct SessionState { session_id: String, /// Responsible for analyzing and rewrite a logical plan before optimization analyzer: Analyzer, - /// Provides support for customising the SQL planner, e.g. to add support for custom operators like `->>` or `?` + /// Provides support for customizing the SQL planner, e.g. to add support for custom operators like `->>` or `?` expr_planners: Vec>, - /// Provides support for customising the SQL type planning + /// Provides support for customizing the SQL type planning type_planner: Option>, /// Responsible for optimizing a logical plan optimizer: Optimizer, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index a1b18b8bfe8c..e9501bd37a8a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -182,7 +182,7 @@ //! //! DataFusion is designed to be highly extensible, so you can //! start with a working, full featured engine, and then -//! specialize any behavior for your usecase. For example, +//! specialize any behavior for your use case. For example, //! some projects may add custom [`ExecutionPlan`] operators, or create their own //! query language that directly creates [`LogicalPlan`] rather than using the //! built in SQL planner, [`SqlToRel`]. diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 76c4d668d797..3c8d08ee32d4 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1252,7 +1252,7 @@ fn ensure_distribution( // to increase parallelism. child = add_roundrobin_on_top(child, target_partitions)?; } - // When inserting hash is necessary to satisy hash requirement, insert hash repartition. + // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?; @@ -2833,11 +2833,11 @@ pub(crate) mod tests { ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin - // when mode is Right, RgihtSemi, RightAnti, Full + // when mode is Right, RightSemi, RightAnti, Full // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases // when mode is Inner, Left, LeftSemi, LeftAnti // Similarly, since partitioning of the left side is not preserved - // when mode is Right, RgihtSemi, RightAnti, Full + // when mode is Right, RightSemi, RightAnti, Full // - We need to add one additional Hash Repartition after SortMergeJoin in contrast the test // cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ @@ -2885,11 +2885,11 @@ pub(crate) mod tests { ], // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin - // when mode is Right, RgihtSemi, RightAnti, Full + // when mode is Right, RightSemi, RightAnti, Full // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases // when mode is Inner, Left, LeftSemi, LeftAnti // Similarly, since partitioning of the left side is not preserved - // when mode is Right, RgihtSemi, RightAnti, Full + // when mode is Right, RightSemi, RightAnti, Full // - We need to add one additional Hash Repartition and Roundrobin repartition after // SortMergeJoin in contrast the test cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 85fe9ecfcdb0..dd8e9d900b7d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -720,7 +720,7 @@ mod tests { let state = session_ctx.state(); // This file has 4 rules that use tree node, apply these rules as in the - // EnforSorting::optimize implementation + // EnforceSorting::optimize implementation // After these operations tree nodes should be in a consistent state. // This code block makes sure that these rules doesn't violate tree node integrity. { diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 009757f3a938..29c6e0078847 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -61,7 +61,7 @@ impl JoinSelection { // TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller. // TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times. /// Checks statistics for join swap. -fn should_swap_join_order( +pub(crate) fn should_swap_join_order( left: &dyn ExecutionPlan, right: &dyn ExecutionPlan, ) -> Result { @@ -108,7 +108,7 @@ fn supports_collect_by_thresholds( } /// Predicate that checks whether the given join type supports input swapping. -fn supports_swap(join_type: JoinType) -> bool { +pub(crate) fn supports_swap(join_type: JoinType) -> bool { matches!( join_type, JoinType::Inner @@ -222,7 +222,7 @@ pub fn swap_hash_join( } /// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required -fn swap_nl_join(join: &NestedLoopJoinExec) -> Result> { +pub(crate) fn swap_nl_join(join: &NestedLoopJoinExec) -> Result> { let new_filter = swap_join_filter(join.filter()); let new_join_type = &swap_join_type(*join.join_type()); @@ -359,7 +359,7 @@ impl PhysicalOptimizerRule for JoinSelection { /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides. /// When the `ignore_threshold` is false, this function will also check left /// and right sizes in bytes or rows. -fn try_collect_left( +pub(crate) fn try_collect_left( hash_join: &HashJoinExec, ignore_threshold: bool, threshold_byte_size: usize, @@ -421,7 +421,14 @@ fn try_collect_left( } } -fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result> { +/// Creates a partitioned hash join execution plan, swapping inputs if beneficial. +/// +/// Checks if the join order should be swapped based on the join type and input statistics. +/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise, +/// creates a standard partitioned hash join. +pub(crate) fn partitioned_hash_join( + hash_join: &HashJoinExec, +) -> Result> { let left = hash_join.left(); let right = hash_join.right(); if supports_swap(*hash_join.join_type()) && should_swap_join_order(&**left, &**right)? diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 3ac40bfb62ea..d2d35c3877c1 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -615,15 +615,15 @@ fn try_embed_projection( /// Collect all column indices from the given projection expressions. fn collect_column_indices(exprs: &[(Arc, String)]) -> Vec { // Collect indices and remove duplicates. - let mut indexs = exprs + let mut indices = exprs .iter() .flat_map(|(expr, _)| collect_columns(expr)) .map(|x| x.index()) .collect::>() .into_iter() .collect::>(); - indexs.sort(); - indexs + indices.sort(); + indices } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index aa134f28fe5b..b1a6f014380e 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -286,7 +286,7 @@ impl DisplayAs for UnboundedExec { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, - "UnboundableExec: unbounded={}", + "UnboundedExec: unbounded={}", self.batch_produce.is_none(), ) } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index f4f754b11c6d..02fe2d83b3c4 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1140,7 +1140,7 @@ async fn unnest_fixed_list_drop_nulls() -> Result<()> { } #[tokio::test] -async fn unnest_fixed_list_nonull() -> Result<()> { +async fn unnest_fixed_list_non_null() -> Result<()> { let mut shape_id_builder = UInt32Builder::new(); let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2); @@ -2053,9 +2053,9 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> { // Executing LogicalPlans with placeholders that don't have bound values // should fail. let results = df.collect().await; - let err_mesg = results.unwrap_err().strip_backtrace(); + let err_msg = results.unwrap_err().strip_backtrace(); assert_eq!( - err_mesg, + err_msg, "Execution error: Placeholder '$0' was not provided a value for execution." ); @@ -2119,9 +2119,9 @@ async fn test_dataframe_placeholder_column_parameter() -> Result<()> { // Executing LogicalPlans with placeholders that don't have bound values // should fail. let results = df.collect().await; - let err_mesg = results.unwrap_err().strip_backtrace(); + let err_msg = results.unwrap_err().strip_backtrace(); assert_eq!( - err_mesg, + err_msg, "Execution error: Placeholder '$1' was not provided a value for execution." ); @@ -2189,9 +2189,9 @@ async fn test_dataframe_placeholder_like_expression() -> Result<()> { // Executing LogicalPlans with placeholders that don't have bound values // should fail. let results = df.collect().await; - let err_mesg = results.unwrap_err().strip_backtrace(); + let err_msg = results.unwrap_err().strip_backtrace(); assert_eq!( - err_mesg, + err_msg, "Execution error: Placeholder '$1' was not provided a value for execution." ); @@ -2277,12 +2277,12 @@ async fn write_partitioned_parquet_results() -> Result<()> { // Explicitly read the parquet file at c2=123 to verify the physical files are partitioned let partitioned_file = format!("{out_dir}/c2=123", out_dir = out_dir); - let filted_df = ctx + let filter_df = ctx .read_parquet(&partitioned_file, ParquetReadOptions::default()) .await?; // Check that the c2 column is gone and that c1 is abc. - let results = filted_df.collect().await?; + let results = filter_df.collect().await?; let expected = ["+-----+", "| c1 |", "+-----+", "| abc |", "+-----+"]; assert_batches_eq!(expected, &results); diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs index 881949047bff..e18dab35fc91 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs @@ -184,7 +184,7 @@ fn add_equal_conditions_test() -> Result<()> { assert!(eq_groups.contains(&col_a_expr)); assert!(eq_groups.contains(&col_b_expr)); - // b and c are aliases. Exising equivalence class should expand, + // b and c are aliases. Existing equivalence class should expand, // however there shouldn't be any new equivalence class eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs index c52acdd82764..a82849f4ea92 100644 --- a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs @@ -281,7 +281,7 @@ fn i64string_batch<'a>( .unwrap() } -/// Run the TopK test, sorting the input batches with the specified ftch +/// Run the TopK test, sorting the input batches with the specified fetch /// (limit) and compares the results to the expected values. async fn run_limit_test(fetch: usize, data: &SortedData) { let input = data.batches(); diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index e4acb96f4930..19ffa69f11d3 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -93,7 +93,7 @@ impl SortTest { self } - /// specify that this test should use a memory pool of the specifeid size + /// specify that this test should use a memory pool of the specified size fn with_pool_size(mut self, pool_size: usize) -> Self { self.pool_size = Some(pool_size); self diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 73f4a569954e..daa282c8fe4a 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -261,15 +261,15 @@ mod sp_repartition_fuzz_tests { for ordering in eq_properties.oeq_class().iter() { let err_msg = format!("error in eq properties: {:?}", eq_properties); - let sort_solumns = ordering + let sort_columns = ordering .iter() .map(|sort_expr| sort_expr.evaluate_to_sort_column(&res)) .collect::>>()?; - let orig_columns = sort_solumns + let orig_columns = sort_columns .iter() .map(|sort_column| sort_column.values.clone()) .collect::>(); - let sorted_columns = lexsort(&sort_solumns, None)?; + let sorted_columns = lexsort(&sort_columns, None)?; // Make sure after merging ordering is still valid. assert_eq!(orig_columns.len(), sorted_columns.len(), "{}", err_msg); diff --git a/datafusion/core/tests/macro_hygiene/mod.rs b/datafusion/core/tests/macro_hygiene/mod.rs index 62f24f5198e6..5aff1d5e3296 100644 --- a/datafusion/core/tests/macro_hygiene/mod.rs +++ b/datafusion/core/tests/macro_hygiene/mod.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Verifies [Macro Hygene] +//! Verifies [Macro Hygiene] //! -//! [Macro Hygene]: https://en.wikipedia.org/wiki/Hygienic_macro +//! [Macro Hygiene]: https://en.wikipedia.org/wiki/Hygienic_macro mod plan_err { // NO other imports! diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index fa23f5c699e2..61a9e9b5757c 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -161,7 +161,7 @@ async fn plan_and_filter() { RowGroupAccess::Scan, ])); - // initia + // initial let parquet_metrics = TestFull { access_plan, expected_rows: 0, @@ -274,7 +274,7 @@ struct Test { impl Test { /// Runs the test case, panic'ing on error. /// - /// Returns the `MetricsSet` from the ParqeutExec + /// Returns the [`MetricsSet`] from the [`ParquetExec`] async fn run_success(self) -> MetricsSet { let Self { access_plan, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 39fd492786bc..5fb0b9852641 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -566,7 +566,7 @@ async fn csv_explain_verbose_plans() { #[tokio::test] async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str) { // repro for https://github.com/apache/datafusion/issues/917 - // where EXPLAIN ANALYZE was not correctly running optiimizer + // where EXPLAIN ANALYZE was not correctly running optimizer let ctx = SessionContext::new(); register_alltypes_parquet(&ctx).await; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 82f73eadba8c..03c4ad7c013e 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -196,7 +196,7 @@ fn populate_csv_partitions( Ok(schema) } -/// Specialised String representation +/// Specialized String representation fn col_str(column: &ArrayRef, row_index: usize) -> String { // NullArray::is_null() does not work on NullArray. // can remove check for DataType::Null when diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 975984e5b11f..441af1639d9b 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -183,7 +183,7 @@ async fn parquet_distinct_partition_col() -> Result<()> { max_limit += 1; let last_batch = results .last() - .expect("There shouled be at least one record batch returned"); + .expect("There should be at least one record batch returned"); let last_row_idx = last_batch.num_rows() - 1; let mut min_limit = match ScalarValue::try_from_array(last_batch.column(0), last_row_idx)? { @@ -568,7 +568,7 @@ async fn parquet_overlapping_columns() -> Result<()> { assert!( result.is_err(), - "Dupplicate qualified name should raise error" + "Duplicate qualified name should raise error" ); Ok(()) } diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 99c00615376f..bf32eef3b011 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -724,7 +724,7 @@ impl Accumulator for FirstSelector { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // cast argumets to the appropriate type (DataFusion will type + // cast arguments to the appropriate type (DataFusion will type // check these based on the declared allowed input types) let v = as_primitive_array::(&values[0])?; let t = as_primitive_array::(&values[1])?; diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 77753290c37e..487063642345 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -482,7 +482,7 @@ impl ExtensionPlanner for TopKPlanner { /// code is not general and is meant as an illustration only struct TopKExec { input: Arc, - /// The maxium number of values + /// The maximum number of values k: usize, cache: PlanProperties, } diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 39f10ef11ab0..b5f94107dd0b 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -228,8 +228,8 @@ fn read_csv_batches(csv_path: impl AsRef) -> Result<(SchemaRef, Vec = $0` will infer `$0` to + /// For example, given an expression like ` = $0` will infer `$0` to /// have type `int32`. /// /// Returns transformed expression and flag that is true if expression contains @@ -2263,7 +2263,7 @@ impl Display for SchemaDisplay<'_> { "{}({}{})", func.name(), if *distinct { "DISTINCT " } else { "" }, - schema_name_from_exprs_comma_seperated_without_space(args)? + schema_name_from_exprs_comma_separated_without_space(args)? )?; if let Some(null_treatment) = null_treatment { @@ -2335,7 +2335,7 @@ impl Display for SchemaDisplay<'_> { write!(f, "END") } - // Cast expr is not shown to be consistant with Postgres and Spark + // Cast expr is not shown to be consistent with Postgres and Spark Expr::Cast(Cast { expr, .. }) | Expr::TryCast(TryCast { expr, .. }) => { write!(f, "{}", SchemaDisplay(expr)) } @@ -2465,7 +2465,7 @@ impl Display for SchemaDisplay<'_> { f, "{}({})", fun, - schema_name_from_exprs_comma_seperated_without_space(args)? + schema_name_from_exprs_comma_separated_without_space(args)? )?; if let Some(null_treatment) = null_treatment { @@ -2495,7 +2495,7 @@ impl Display for SchemaDisplay<'_> { /// Internal usage. Please call `schema_name_from_exprs` instead // TODO: Use ", " to standardize the formatting of Vec, // -pub(crate) fn schema_name_from_exprs_comma_seperated_without_space( +pub(crate) fn schema_name_from_exprs_comma_separated_without_space( exprs: &[Expr], ) -> Result { schema_name_from_exprs_inner(exprs, ",") @@ -2598,7 +2598,7 @@ impl Display for Expr { Expr::ScalarFunction(fun) => { fmt_function(f, fun.name(), false, &fun.args, true) } - // TODO: use udf's display_name, need to fix the seperator issue, + // TODO: use udf's display_name, need to fix the separator issue, // Expr::ScalarFunction(ScalarFunction { func, args }) => { // write!(f, "{}", func.display_name(args).unwrap()) // } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index e61904e24918..d5c2ac396eb9 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -99,7 +99,7 @@ impl ExprSchemable for Expr { /// expression refers to a column that does not exist in the /// schema, or when the expression is incorrectly typed /// (e.g. `[utf8] + [bool]`). - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn get_type(&self, schema: &dyn ExprSchema) -> Result { match self { Expr::Alias(Alias { expr, name, .. }) => match &**expr { diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 23ffc83e3549..e0235d32292f 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -69,7 +69,7 @@ pub type StateTypeFunction = /// * 'aggregate_function': [crate::expr::AggregateFunction] for which simplified has been invoked /// * 'info': [crate::simplify::SimplifyInfo] /// -///Cclosure returns simplified [Expr] or an error. +/// Closure returns simplified [Expr] or an error. pub type AggregateFunctionSimplification = Box< dyn Fn( crate::expr::AggregateFunction, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index e15c0a36a0e9..c7cff3ac26b1 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -155,11 +155,11 @@ impl LogicalPlanBuilder { } // Ensure that the static term and the recursive term have the same number of fields let static_fields_len = self.plan.schema().fields().len(); - let recurive_fields_len = recursive_term.schema().fields().len(); - if static_fields_len != recurive_fields_len { + let recursive_fields_len = recursive_term.schema().fields().len(); + if static_fields_len != recursive_fields_len { return plan_err!( "Non-recursive term and recursive term must have the same number of columns ({} != {})", - static_fields_len, recurive_fields_len + static_fields_len, recursive_fields_len ); } // Ensure that the recursive term has the same field types as the static term @@ -254,7 +254,7 @@ impl LogicalPlanBuilder { if can_cast_types(&data_type, field_type) { } else { return exec_err!( - "type mistmatch and can't cast to got {} and {}", + "type mismatch and can't cast to got {} and {}", data_type, field_type ); @@ -1635,7 +1635,7 @@ pub fn wrap_projection_for_join_if_necessary( .iter() .map(|key| { // The display_name() of cast expression will ignore the cast info, and show the inner expression name. - // If we do not add alais, it will throw same field name error in the schema when adding projection. + // If we do not add alias, it will throw same field name error in the schema when adding projection. // For example: // input scan : [a, b, c], // join keys: [cast(a as int)] @@ -1776,7 +1776,7 @@ pub fn get_unnested_columns( let new_field = Arc::new(Field::new( col_name, data_type, // Unnesting may produce NULLs even if the list is not null. - // For example: unnset([1], []) -> 1, null + // For example: unnest([1], []) -> 1, null true, )); let column = Column::from_name(col_name); diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 3efaf296c29c..a433871ef20d 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -303,7 +303,7 @@ pub struct CreateMemoryTable { pub or_replace: bool, /// Default values for columns pub column_defaults: Vec<(String, Expr)>, - /// Wheter the table is `TableType::Temporary` + /// Whether the table is `TableType::Temporary` pub temporary: bool, } @@ -318,7 +318,7 @@ pub struct CreateView { pub or_replace: bool, /// SQL used to create the view, if available pub definition: Option, - /// Wheter the view is ephemeral + /// Whether the view is ephemeral pub temporary: bool, } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6c2b923cf6ad..47d9aac3caf2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1202,7 +1202,7 @@ impl LogicalPlan { /// # let schema = Schema::new(vec![ /// # Field::new("id", DataType::Int32, false), /// # ]); - /// // Build SELECT * FROM t1 WHRERE id = $1 + /// // Build SELECT * FROM t1 WHERE id = $1 /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .filter(col("id").eq(placeholder("$1"))).unwrap() /// .build().unwrap(); @@ -1225,7 +1225,7 @@ impl LogicalPlan { /// ); /// /// // Note you can also used named parameters - /// // Build SELECT * FROM t1 WHRERE id = $my_param + /// // Build SELECT * FROM t1 WHERE id = $my_param /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .filter(col("id").eq(placeholder("$my_param"))).unwrap() /// .build().unwrap() @@ -3633,7 +3633,7 @@ digraph { "#; // just test for a few key lines in the output rather than the - // whole thing to make test mainteance easier. + // whole thing to make test maintenance easier. let graphviz = format!("{}", plan.display_graphviz()); assert_eq!(expected_graphviz, graphviz); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index cdc95b84d837..9a6103afd4b4 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -668,7 +668,7 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] pub fn visit_with_subqueries TreeNodeVisitor<'n, Node = Self>>( &self, visitor: &mut V, @@ -687,7 +687,7 @@ impl LogicalPlan { /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] pub fn rewrite_with_subqueries>( self, rewriter: &mut R, @@ -706,7 +706,7 @@ impl LogicalPlan { &self, mut f: F, ) -> Result { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn apply_with_subqueries_impl< F: FnMut(&LogicalPlan) -> Result, >( @@ -741,7 +741,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_down_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -766,7 +766,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_up_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -794,7 +794,7 @@ impl LogicalPlan { mut f_down: FD, mut f_up: FU, ) -> Result> { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn transform_down_up_with_subqueries_impl< FD: FnMut(LogicalPlan) -> Result>, FU: FnMut(LogicalPlan) -> Result>, diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index e9a677de50c1..d62484153f53 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -55,7 +55,7 @@ pub enum TableProviderFilterPushDown { pub enum TableType { /// An ordinary physical table. Base, - /// A non-materialised table that itself uses a query internally to provide data. + /// A non-materialized table that itself uses a query internally to provide data. View, /// A transient table. Temporary, @@ -99,7 +99,7 @@ pub trait TableSource: Sync + Send { } /// Tests whether the table provider can make use of any or all filter expressions - /// to optimise data retrieval. Only non-volatile expressions are passed to this function. + /// to optimize data retrieval. Only non-volatile expressions are passed to this function. fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 38e7106e6e64..96bb5c4b2d8f 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -520,7 +520,7 @@ fn get_valid_types( TypeSignature::Numeric(number) => { function_length_check(current_types.len(), *number)?; - // Find common numeric type amongs given types except string + // Find common numeric type among given types except string let mut valid_type = current_types.first().unwrap().to_owned(); for t in current_types.iter().skip(1) { let logical_data_type: NativeType = t.into(); diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 83200edfa24c..51c42b5c4c30 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -17,7 +17,7 @@ //! [`ScalarUDF`]: Scalar User Defined Functions -use crate::expr::schema_name_from_exprs_comma_seperated_without_space; +use crate::expr::schema_name_from_exprs_comma_separated_without_space; use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ @@ -436,7 +436,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(format!( "{}({})", self.name(), - schema_name_from_exprs_comma_seperated_without_space(args)? + schema_name_from_exprs_comma_separated_without_space(args)? )) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6f7c5d379260..9d0a2b5b95f6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -67,7 +67,7 @@ pub fn grouping_set_expr_count(group_expr: &[Expr]) -> Result { "Invalid group by expressions, GroupingSet must be the only expression" ); } - // Groupings sets have an additional interal column for the grouping id + // Groupings sets have an additional integral column for the grouping id Ok(grouping_set.distinct_expr().len() + 1) } else { grouping_set_to_exprlist(group_expr).map(|exprs| exprs.len()) @@ -1112,7 +1112,7 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<& } } -/// Iteratate parts in a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// Iterate parts in a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` /// /// See [`split_conjunction_owned`] for more details and an example. pub fn iter_conjunction(expr: &Expr) -> impl Iterator { @@ -1136,7 +1136,7 @@ pub fn iter_conjunction(expr: &Expr) -> impl Iterator { }) } -/// Iteratate parts in a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// Iterate parts in a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` /// /// See [`split_conjunction_owned`] for more details and an example. pub fn iter_conjunction_owned(expr: Expr) -> impl Iterator { @@ -1301,7 +1301,7 @@ pub fn conjunction(filters: impl IntoIterator) -> Option { /// col("b").eq(lit(2)), /// ]; /// -/// // use disjuncton to join them together with `OR` +/// // use disjunction to join them together with `OR` /// assert_eq!(disjunction(split), Some(expr)); /// ``` pub fn disjunction(filters: impl IntoIterator) -> Option { diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index 222914315d70..815d5742afd2 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -274,7 +274,7 @@ impl WindowFrame { Ok(()) } - /// Returns whether the window frame can accept multiple ORDER BY expressons. + /// Returns whether the window frame can accept multiple ORDER BY expressions. pub fn can_accept_multi_orderby(&self) -> bool { match self.units { WindowFrameUnits::Rows => true, diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 01f7c46106a2..b229d908d10d 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -66,7 +66,7 @@ use datafusion::error::Result; /// calls defined on the `FFI_` structs. The second purpose of the `Foreign` /// structs is to contain additional data that may be needed by the traits that /// are implemented on them. Some of these traits require borrowing data which -/// can be far more convienent to be locally stored. +/// can be far more convenient to be locally stored. /// /// For example, we have a struct `FFI_TableProvider` to give access to the /// `TableProvider` functions like `table_type()` and `scan()`. If we write a @@ -318,7 +318,7 @@ impl FFI_TableProvider { } } -/// This wrapper struct exists on the reciever side of the FFI interface, so it has +/// This wrapper struct exists on the receiver side of the FFI interface, so it has /// no guarantees about being able to access the data in `private_data`. Any functions /// defined on this struct must only use the stable functions provided in /// FFI_TableProvider to interact with the foreign table provider. @@ -397,7 +397,7 @@ impl TableProvider for ForeignTableProvider { } /// Tests whether the table provider can make use of a filter expression - /// to optimise data retrieval. + /// to optimize data retrieval. fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index 03e4ef557269..aa2f5a586e87 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -79,7 +79,7 @@ use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; /// /// Logical group Current Min/Max value for that group stored /// number as a ScalarValue which points to an -/// indivdually allocated String +/// individually allocated String /// ///``` /// @@ -281,7 +281,7 @@ impl GroupsAccumulatorAdapter { /// See [`Self::allocation_bytes`] for rationale. fn free_allocation(&mut self, size: usize) { // use saturating sub to avoid errors if the accumulators - // report erronious sizes + // report erroneous sizes self.allocation_bytes = self.allocation_bytes.saturating_sub(size) } diff --git a/datafusion/functions-aggregate-common/src/tdigest.rs b/datafusion/functions-aggregate-common/src/tdigest.rs index 13e40a2b9966..378fc8c42bc6 100644 --- a/datafusion/functions-aggregate-common/src/tdigest.rs +++ b/datafusion/functions-aggregate-common/src/tdigest.rs @@ -23,7 +23,7 @@ //! [Facebook's Folly TDigest] implementation. //! //! Alterations include reduction of runtime heap allocations, broader type -//! support, (de-)serialisation support, reduced type conversions and null value +//! support, (de-)serialization support, reduced type conversions and null value //! tolerance. //! //! [TDigest sketch algorithm]: https://arxiv.org/abs/1902.04023 @@ -612,7 +612,7 @@ impl TDigest { ] } - /// Unpack the serialised state of a [`TDigest`] produced by + /// Unpack the serialized state of a [`TDigest`] produced by /// [`Self::to_scalar_state()`]. /// /// # Correctness diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index 61424e8f2445..000c69d9f331 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -231,7 +231,7 @@ impl AggregateUDFImpl for ApproxPercentileCont { } #[allow(rustdoc::private_intra_doc_links)] - /// See [`TDigest::to_scalar_state()`] for a description of the serialised + /// See [`TDigest::to_scalar_state()`] for a description of the serialized /// state. fn state_fields(&self, args: StateFieldsArgs) -> Result> { Ok(vec![ diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs index 10b9b06f1f94..16dac2c1b8f0 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs @@ -172,7 +172,7 @@ impl AggregateUDFImpl for ApproxPercentileContWithWeight { } #[allow(rustdoc::private_intra_doc_links)] - /// See [`TDigest::to_scalar_state()`] for a description of the serialised + /// See [`TDigest::to_scalar_state()`] for a description of the serialized /// state. fn state_fields(&self, args: StateFieldsArgs) -> Result> { self.approx_percentile_cont.state_fields(args) diff --git a/datafusion/functions-aggregate/src/covariance.rs b/datafusion/functions-aggregate/src/covariance.rs index ffbf2ceef052..d4ae27533c6d 100644 --- a/datafusion/functions-aggregate/src/covariance.rs +++ b/datafusion/functions-aggregate/src/covariance.rs @@ -246,7 +246,7 @@ impl AggregateUDFImpl for CovariancePopulation { /// Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154. /// /// Though it is not covered in the original paper but is based on the same idea, as a result the algorithm is online, -/// parallelizable and numerically stable. +/// parallelize and numerically stable. #[derive(Debug)] pub struct CovarianceAccumulator { diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index a0f7634c5fa8..c4e05bd57de6 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -372,7 +372,7 @@ macro_rules! typed_min_max_batch_string { ScalarValue::$SCALAR(value) }}; } -// Statically-typed version of min/max(array) -> ScalarValue for binay types. +// Statically-typed version of min/max(array) -> ScalarValue for binary types. macro_rules! typed_min_max_batch_binary { ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ let array = downcast_value!($VALUES, $ARRAYTYPE); diff --git a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs index 25499c252191..725b7a29bd47 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs @@ -449,7 +449,7 @@ impl MinMaxBytesState { self.min_max.resize(total_num_groups, None); // Minimize value copies by calculating the new min/maxes for each group // in this batch (either the existing min/max or the new input value) - // and updating the owne values in `self.min_maxes` at most once + // and updating the owned values in `self.min_maxes` at most once let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups]; // Figure out the new min value for each group @@ -463,12 +463,12 @@ impl MinMaxBytesState { // previous input value was the min/max, so compare it MinMaxLocation::Input(existing_val) => existing_val, MinMaxLocation::ExistingMinMax => { - let Some(exising_val) = self.min_max[group_index].as_ref() else { + let Some(existing_val) = self.min_max[group_index].as_ref() else { // no existing min/max, so this is the new min/max locations[group_index] = MinMaxLocation::Input(new_val); continue; }; - exising_val.as_ref() + existing_val.as_ref() } }; diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index ced43ea8f00c..887daa71ec55 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use datafusion_catalog::Session; use datafusion_catalog::TableFunctionImpl; use datafusion_catalog::TableProvider; -use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue}; +use datafusion_common::{plan_err, Result, ScalarValue}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; use datafusion_physical_plan::ExecutionPlan; @@ -30,28 +30,45 @@ use parking_lot::RwLock; use std::fmt; use std::sync::Arc; -/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive) +/// Indicates the arguments used for generating a series. +#[derive(Debug, Clone)] +enum GenSeriesArgs { + /// ContainsNull signifies that at least one argument(start, end, step) was null, thus no series will be generated. + ContainsNull, + /// AllNotNullArgs holds the start, end, and step values for generating the series when all arguments are not null. + AllNotNullArgs { start: i64, end: i64, step: i64 }, +} + +/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step #[derive(Debug, Clone)] struct GenerateSeriesTable { schema: SchemaRef, - // None if input is Null - start: Option, - // None if input is Null - end: Option, + args: GenSeriesArgs, } -/// Table state that generates a series of integers from `start`(inclusive) to `end`(inclusive) +/// Table state that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step #[derive(Debug, Clone)] struct GenerateSeriesState { schema: SchemaRef, start: i64, // Kept for display end: i64, + step: i64, batch_size: usize, /// Tracks current position when generating table current: i64, } +impl GenerateSeriesState { + fn reach_end(&self, val: i64) -> bool { + if self.step > 0 { + return val > self.end; + } + + val < self.end + } +} + /// Detail to display for 'Explain' plan impl fmt::Display for GenerateSeriesState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -65,19 +82,19 @@ impl fmt::Display for GenerateSeriesState { impl LazyBatchGenerator for GenerateSeriesState { fn generate_next_batch(&mut self) -> Result> { - // Check if we've reached the end - if self.current > self.end { + let mut buf = Vec::with_capacity(self.batch_size); + while buf.len() < self.batch_size && !self.reach_end(self.current) { + buf.push(self.current); + self.current += self.step; + } + let array = Int64Array::from(buf); + + if array.is_empty() { return Ok(None); } - // Construct batch - let batch_end = (self.current + self.batch_size as i64 - 1).min(self.end); - let array = Int64Array::from_iter_values(self.current..=batch_end); let batch = RecordBatch::try_new(self.schema.clone(), vec![Arc::new(array)])?; - // Update current position for next batch - self.current = batch_end + 1; - Ok(Some(batch)) } } @@ -104,39 +121,31 @@ impl TableProvider for GenerateSeriesTable { _limit: Option, ) -> Result> { let batch_size = state.config_options().execution.batch_size; - match (self.start, self.end) { - (Some(start), Some(end)) => { - if start > end { - return plan_err!( - "End value must be greater than or equal to start value" - ); - } - - Ok(Arc::new(LazyMemoryExec::try_new( - self.schema.clone(), - vec![Arc::new(RwLock::new(GenerateSeriesState { - schema: self.schema.clone(), - start, - end, - current: start, - batch_size, - }))], - )?)) - } - _ => { - // Either start or end is None, return a generator that outputs 0 rows - Ok(Arc::new(LazyMemoryExec::try_new( - self.schema.clone(), - vec![Arc::new(RwLock::new(GenerateSeriesState { - schema: self.schema.clone(), - start: 0, - end: 0, - current: 1, - batch_size, - }))], - )?)) - } - } + + let state = match self.args { + // if args have null, then return 0 row + GenSeriesArgs::ContainsNull => GenerateSeriesState { + schema: self.schema.clone(), + start: 0, + end: 0, + step: 1, + current: 1, + batch_size, + }, + GenSeriesArgs::AllNotNullArgs { start, end, step } => GenerateSeriesState { + schema: self.schema.clone(), + start, + end, + step, + current: start, + batch_size, + }, + }; + + Ok(Arc::new(LazyMemoryExec::try_new( + self.schema.clone(), + vec![Arc::new(RwLock::new(state))], + )?)) } } @@ -144,37 +153,58 @@ impl TableProvider for GenerateSeriesTable { pub struct GenerateSeriesFunc {} impl TableFunctionImpl for GenerateSeriesFunc { - // Check input `exprs` type and number. Input validity check (e.g. start <= end) - // will be performed in `TableProvider::scan` fn call(&self, exprs: &[Expr]) -> Result> { - // TODO: support 1 or 3 arguments following DuckDB: - // - if exprs.len() == 3 || exprs.len() == 1 { - return not_impl_err!("generate_series does not support 1 or 3 arguments"); + if exprs.is_empty() || exprs.len() > 3 { + return plan_err!("generate_series function requires 1 to 3 arguments"); } - if exprs.len() != 2 { - return plan_err!("generate_series expects 2 arguments"); + let mut normalize_args = Vec::new(); + for expr in exprs { + match expr { + Expr::Literal(ScalarValue::Null) => {} + Expr::Literal(ScalarValue::Int64(Some(n))) => normalize_args.push(*n), + _ => return plan_err!("First argument must be an integer literal"), + }; } - let start = match &exprs[0] { - Expr::Literal(ScalarValue::Null) => None, - Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n), - _ => return plan_err!("First argument must be an integer literal"), - }; - - let end = match &exprs[1] { - Expr::Literal(ScalarValue::Null) => None, - Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n), - _ => return plan_err!("Second argument must be an integer literal"), - }; - let schema = Arc::new(Schema::new(vec![Field::new( "value", DataType::Int64, false, )])); - Ok(Arc::new(GenerateSeriesTable { schema, start, end })) + if normalize_args.len() != exprs.len() { + // contain null + return Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::ContainsNull, + })); + } + + let (start, end, step) = match &normalize_args[..] { + [end] => (0, *end, 1), + [start, end] => (*start, *end, 1), + [start, end, step] => (*start, *end, *step), + _ => { + return plan_err!("generate_series function requires 1 to 3 arguments"); + } + }; + + if start > end && step > 0 { + return plan_err!("start is bigger than end, but increment is positive: cannot generate infinite series"); + } + + if start < end && step < 0 { + return plan_err!("start is smaller than end, but increment is negative: cannot generate infinite series"); + } + + if step == 0 { + return plan_err!("step cannot be zero"); + } + + Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::AllNotNullArgs { start, end, step }, + })) } } diff --git a/datafusion/functions/src/core/greatest.rs b/datafusion/functions/src/core/greatest.rs index e91ec2b0c4d8..654b2a2987c7 100644 --- a/datafusion/functions/src/core/greatest.rs +++ b/datafusion/functions/src/core/greatest.rs @@ -85,7 +85,7 @@ impl GreatestLeastOperator for GreatestFunc { /// Nulls are always considered smaller than any other value fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { // Fast path: - // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorised kernel + // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorized kernel // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. // - both array does not have any nulls: cmp::gt_eq will return null if any of the input is null while we want to return false in that case if !lhs.data_type().is_nested() diff --git a/datafusion/functions/src/core/least.rs b/datafusion/functions/src/core/least.rs index b9ea65cdb732..085090727773 100644 --- a/datafusion/functions/src/core/least.rs +++ b/datafusion/functions/src/core/least.rs @@ -98,7 +98,7 @@ impl GreatestLeastOperator for LeastFunc { /// Nulls are always considered larger than any other value fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { // Fast path: - // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorised kernel + // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorized kernel // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. // - both array does not have any nulls: cmp::lt_eq will return null if any of the input is null while we want to return false in that case if !lhs.data_type().is_nested() diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index ba8255d2e472..76fb4bbe5b47 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -115,8 +115,8 @@ pub fn functions() -> Vec> { // `get_field(my_struct_col, "field_name")`. // // However, it is also exposed directly for use cases such as - // serializing / deserializing plans with the field access desugared to - // calls to `get_field` + // serializing / deserializing plans with the field access desugared to + // calls to [`get_field`] get_field(), coalesce(), greatest(), diff --git a/datafusion/functions/src/core/nullif.rs b/datafusion/functions/src/core/nullif.rs index 0c2d01376de9..7c86047a0243 100644 --- a/datafusion/functions/src/core/nullif.rs +++ b/datafusion/functions/src/core/nullif.rs @@ -215,7 +215,7 @@ mod tests { #[test] // Ensure that arrays with no nulls can also invoke NULLIF() correctly - fn nullif_int32_nonulls() -> Result<()> { + fn nullif_int32_non_nulls() -> Result<()> { let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]); let a = ColumnarValue::Array(Arc::new(a)); diff --git a/datafusion/functions/src/core/nvl.rs b/datafusion/functions/src/core/nvl.rs index 6c470eca3d46..eb8c9260601f 100644 --- a/datafusion/functions/src/core/nvl.rs +++ b/datafusion/functions/src/core/nvl.rs @@ -216,7 +216,7 @@ mod tests { #[test] // Ensure that arrays with no nulls can also invoke nvl() correctly - fn nvl_int32_nonulls() -> Result<()> { + fn nvl_int32_non_nulls() -> Result<()> { let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]); let a = ColumnarValue::Array(Arc::new(a)); diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 49bf00d5c17b..bb3f2177b9a4 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -318,7 +318,7 @@ fn to_utc_date_time(nanos: i64) -> DateTime { // Supported intervals: // 1. IntervalDayTime: this means that the stride is in days, hours, minutes, seconds and milliseconds // We will assume month interval won't be converted into this type -// TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somwhere +// TODO (my next PR): without `INTERVAL` keyword, the stride was converted into ScalarValue::IntervalDayTime somewhere // for month interval. I need to find that and make it ScalarValue::IntervalMonthDayNano instead // 2. IntervalMonthDayNano fn date_bin_impl( diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index b43fcb6db706..0f115563c8db 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -105,7 +105,7 @@ impl ScalarUDFImpl for DatePartFunc { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - internal_err!("return_type_from_exprs shoud be called instead") + internal_err!("return_type_from_exprs should be called instead") } fn return_type_from_exprs( diff --git a/datafusion/functions/src/regex/regexpreplace.rs b/datafusion/functions/src/regex/regexpreplace.rs index 9b4a7b04552b..4ed9350e9729 100644 --- a/datafusion/functions/src/regex/regexpreplace.rs +++ b/datafusion/functions/src/regex/regexpreplace.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Regx expressions +//! Regex expressions use arrow::array::ArrayDataBuilder; use arrow::array::BufferBuilder; use arrow::array::GenericStringArray; diff --git a/datafusion/functions/src/string/mod.rs b/datafusion/functions/src/string/mod.rs index c43aaeccbefe..442c055ac37d 100644 --- a/datafusion/functions/src/string/mod.rs +++ b/datafusion/functions/src/string/mod.rs @@ -145,7 +145,7 @@ pub mod expr_fn { "returns uuid v4 as a string value", ), ( contains, - "Return true if search_string is found within string. treated it like a reglike", + "Return true if search_string is found within string.", )); #[doc = "Removes all characters, spaces by default, from both sides of a string"] diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index a6587a91a9fe..f18573db827f 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -189,7 +189,7 @@ impl StringArrayBuilder { self.offsets_buffer.push(next_offset); } - /// Finalise the builder into a concrete [`StringArray`]. + /// Finalize the builder into a concrete [`StringArray`]. /// /// # Panics /// @@ -358,7 +358,7 @@ impl LargeStringArrayBuilder { self.offsets_buffer.push(next_offset); } - /// Finalise the builder into a concrete [`LargeStringArray`]. + /// Finalize the builder into a concrete [`LargeStringArray`]. /// /// # Panics /// diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 3032c67682b1..ba0dedc57675 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -36,8 +36,7 @@ name = "datafusion_optimizer" path = "src/lib.rs" [features] -default = ["recursive-protection"] -recursive-protection = ["dep:recursive"] +recursive_protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 0d04efbcf36a..7129da85f375 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -128,7 +128,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { } // Recursively check the unsupported outer references in the sub query plan. -#[cfg_attr(feature = "recursive-protection", recursive::recursive)] +#[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { if !can_contain_outer_ref && inner_plan.contains_outer_reference() { return plan_err!("Accessing outer reference columns is not allowed in the plan"); diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 628c1498f973..89dd4ca60a6a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -943,7 +943,7 @@ pub fn coerce_union_schema(inputs: &[Arc]) -> Result { ); } - // coerce data type and nullablity for each field + // coerce data type and nullability for each field for (union_datatype, union_nullable, union_field_map, plan_field) in izip!( union_datatypes.iter_mut(), union_nullabilities.iter_mut(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index ff75a6a60f4b..4b9a83fd3e4c 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -382,10 +382,10 @@ impl CommonSubexprEliminate { // keep column names and get rid of additional name // preserving logic here. if let Some(aggr_expr) = aggr_expr { - let name_perserver = NamePreserver::new_for_projection(); + let name_preserver = NamePreserver::new_for_projection(); let saved_names = aggr_expr .iter() - .map(|expr| name_perserver.save(expr)) + .map(|expr| name_preserver.save(expr)) .collect::>(); let new_aggr_expr = rewritten_aggr_expr .into_iter() @@ -531,7 +531,7 @@ impl OptimizerRule for CommonSubexprEliminate { None } - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn rewrite( &self, plan: LogicalPlan, diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 7fdad5ba4b6e..3e5a85ea02db 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -1593,7 +1593,7 @@ mod tests { assert_optimized_plan_equal(plan, expected) } - /// Test for correlated exists subquery filter with disjustions + /// Test for correlated exists subquery filter with disjunctions #[test] fn exists_subquery_disjunction() -> Result<()> { let sq = Arc::new( diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 9a47f437e444..d35572e6d34a 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -79,7 +79,7 @@ impl OptimizerRule for EliminateCrossJoin { true } - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn rewrite( &self, plan: LogicalPlan, @@ -93,7 +93,7 @@ impl OptimizerRule for EliminateCrossJoin { let parent_predicate = if let LogicalPlan::Filter(filter) = plan { // if input isn't a join that can potentially be rewritten // avoid unwrapping the input - let rewriteable = matches!( + let rewritable = matches!( filter.input.as_ref(), LogicalPlan::Join(Join { join_type: JoinType::Inner, @@ -101,7 +101,7 @@ impl OptimizerRule for EliminateCrossJoin { }) ); - if !rewriteable { + if !rewritable { // recursively try to rewrite children return rewrite_children(self, LogicalPlan::Filter(filter), config); } diff --git a/datafusion/optimizer/src/eliminate_group_by_constant.rs b/datafusion/optimizer/src/eliminate_group_by_constant.rs index 035a1d2da229..1213c8ffb368 100644 --- a/datafusion/optimizer/src/eliminate_group_by_constant.rs +++ b/datafusion/optimizer/src/eliminate_group_by_constant.rs @@ -94,7 +94,7 @@ impl OptimizerRule for EliminateGroupByConstant { /// Checks if expression is constant, and can be eliminated from group by. /// /// Intended to be used only within this rule, helper function, which heavily -/// reiles on `SimplifyExpressions` result. +/// relies on `SimplifyExpressions` result. fn is_constant_expression(expr: &Expr) -> bool { match expr { Expr::Alias(e) => is_constant_expression(&e.expr), diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 7c8e4120ea20..b7dd391586a1 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -35,7 +35,7 @@ use datafusion_expr::{ TableScan, Window, }; -use crate::optimize_projections::required_indices::RequiredIndicies; +use crate::optimize_projections::required_indices::RequiredIndices; use crate::utils::NamePreserver; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion, @@ -85,7 +85,7 @@ impl OptimizerRule for OptimizeProjections { config: &dyn OptimizerConfig, ) -> Result> { // All output fields are necessary: - let indices = RequiredIndicies::new_for_all_exprs(&plan); + let indices = RequiredIndices::new_for_all_exprs(&plan); optimize_projections(plan, config, indices) } } @@ -109,11 +109,11 @@ impl OptimizerRule for OptimizeProjections { /// columns. /// - `Ok(None)`: Signal that the given logical plan did not require any change. /// - `Err(error)`: An error occurred during the optimization process. -#[cfg_attr(feature = "recursive-protection", recursive::recursive)] +#[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn optimize_projections( plan: LogicalPlan, config: &dyn OptimizerConfig, - indices: RequiredIndicies, + indices: RequiredIndices, ) -> Result> { // Recursively rewrite any nodes that may be able to avoid computation given // their parents' required indices. @@ -176,7 +176,7 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = - RequiredIndicies::new().with_exprs(schema, all_exprs_iter); + RequiredIndices::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); return optimize_projections( @@ -274,7 +274,7 @@ fn optimize_projections( // For other plan node types, calculate indices for columns they use and // try to rewrite their children - let mut child_required_indices: Vec = match &plan { + let mut child_required_indices: Vec = match &plan { LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) @@ -320,7 +320,7 @@ fn optimize_projections( // EXISTS expression), we may not need to require all indices. plan.inputs() .into_iter() - .map(RequiredIndicies::new_for_all_exprs) + .map(RequiredIndices::new_for_all_exprs) .collect() } LogicalPlan::Extension(extension) => { @@ -340,7 +340,7 @@ fn optimize_projections( .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - RequiredIndicies::new_from_indices(necessary_indices) + RequiredIndices::new_from_indices(necessary_indices) .with_plan_exprs(&plan, child.schema()) }) .collect::>>()? @@ -379,7 +379,7 @@ fn optimize_projections( LogicalPlan::Unnest(Unnest { dependency_indices, .. }) => { - vec![RequiredIndicies::new_from_indices( + vec![RequiredIndices::new_from_indices( dependency_indices.clone(), )] } @@ -443,7 +443,7 @@ fn optimize_projections( /// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the /// merged projection. /// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place). -/// - `Err(error)`: An error occured during the function call. +/// - `Err(error)`: An error occurred during the function call. fn merge_consecutive_projections(proj: Projection) -> Result> { let Projection { expr, @@ -672,9 +672,9 @@ fn outer_columns_helper_multi<'a, 'b>( /// adjusted based on the join type. fn split_join_requirements( left_len: usize, - indices: RequiredIndicies, + indices: RequiredIndices, join_type: &JoinType, -) -> (RequiredIndicies, RequiredIndicies) { +) -> (RequiredIndices, RequiredIndices) { match join_type { // In these cases requirements are split between left/right children: JoinType::Inner @@ -687,10 +687,10 @@ fn split_join_requirements( indices.split_off(left_len) } // All requirements can be re-routed to left child directly. - JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndicies::new()), + JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()), // All requirements can be re-routed to right side directly. // No need to change index, join schema is right child schema. - JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(), indices), + JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices), } } @@ -741,18 +741,18 @@ fn add_projection_on_top_if_helpful( /// /// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection /// - `Ok(None)`: No rewrite necessary. -/// - `Err(error)`: An error occured during the function call. +/// - `Err(error)`: An error occurred during the function call. fn rewrite_projection_given_requirements( proj: Projection, config: &dyn OptimizerConfig, - indices: &RequiredIndicies, + indices: &RequiredIndices, ) -> Result> { let Projection { expr, input, .. } = proj; let exprs_used = indices.get_at_indices(&expr); let required_indices = - RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter()); + RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); // rewrite the children projection, and if they are changed rewrite the // projection down diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 55e8081eaeb2..c1e0885c9b5f 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`RequiredIndicies`] helper for OptimizeProjection +//! [`RequiredIndices`] helper for OptimizeProjection use crate::optimize_projections::outer_columns; use datafusion_common::tree_node::TreeNodeRecursion; @@ -35,7 +35,7 @@ use datafusion_expr::{Expr, LogicalPlan}; /// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented /// by `[1, 2, 3, 4, 6]`. #[derive(Debug, Clone, Default)] -pub(super) struct RequiredIndicies { +pub(super) struct RequiredIndices { /// The indices of the required columns in the indices: Vec, /// If putting a projection above children is beneficial for the parent. @@ -43,7 +43,7 @@ pub(super) struct RequiredIndicies { projection_beneficial: bool, } -impl RequiredIndicies { +impl RequiredIndices { /// Create a new, empty instance pub fn new() -> Self { Self::default() diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 2e2c8fb1d6f8..9e7f8eed8a25 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -134,7 +134,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { return Ok(Transformed::no(LogicalPlan::Projection(projection))); } - let mut all_subqueryies = vec![]; + let mut all_subqueries = vec![]; let mut expr_to_rewrite_expr_map = HashMap::new(); let mut subquery_to_expr_map = HashMap::new(); for expr in projection.expr.iter() { @@ -143,15 +143,15 @@ impl OptimizerRule for ScalarSubqueryToJoin { for (subquery, _) in &subqueries { subquery_to_expr_map.insert(subquery.clone(), expr.clone()); } - all_subqueryies.extend(subqueries); + all_subqueries.extend(subqueries); expr_to_rewrite_expr_map.insert(expr, rewrite_exprs); } - if all_subqueryies.is_empty() { + if all_subqueries.is_empty() { return internal_err!("Expected subqueries not found in projection"); } // iterate through all subqueries in predicate, turning each into a left join let mut cur_input = projection.input.as_ref().clone(); - for (subquery, alias) in all_subqueryies { + for (subquery, alias) in all_subqueries { if let Some((optimized_subquery, expr_check_map)) = build_join(&subquery, &cur_input, &alias)? { @@ -879,7 +879,7 @@ mod tests { Ok(()) } - /// Test for correlated scalar subquery filter with disjustions + /// Test for correlated scalar subquery filter with disjunctions #[test] fn scalar_subquery_disjunction() -> Result<()> { let sq = Arc::new( diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index e3bcb6da8e53..74d2ce0b6be9 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -3661,7 +3661,7 @@ mod tests { } #[test] - fn test_like_and_ilke() { + fn test_like_and_ilike() { let null = lit(ScalarValue::Utf8(None)); // expr [NOT] [I]LIKE NULL @@ -3931,7 +3931,7 @@ mod tests { } #[test] - fn simplify_common_factor_conjuction_in_disjunction() { + fn simplify_common_factor_conjunction_in_disjunction() { let props = ExecutionProps::new(); let schema = boolean_test_schema(); let simplifier = diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 31e21d08b569..8cba2c88e244 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -281,7 +281,7 @@ fn is_supported_type(data_type: &DataType) -> bool { || is_supported_dictionary_type(data_type) } -/// Returns true if [[UnwrapCastExprRewriter]] suppors this numeric type +/// Returns true if [[UnwrapCastExprRewriter]] support this numeric type fn is_supported_numeric_type(data_type: &DataType) -> bool { matches!( data_type, diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index 902e53a7f236..b35d978045d9 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -259,7 +259,7 @@ mod tests { assert!(eq_groups.contains(&col_a_expr)); assert!(eq_groups.contains(&col_b_expr)); - // b and c are aliases. Exising equivalence class should expand, + // b and c are aliases. Existing equivalence class should expand, // however there shouldn't be any new equivalence class eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?; assert_eq!(eq_properties.eq_group().len(), 1); diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index f019b2e570ff..a7f27ab73684 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1408,7 +1408,7 @@ fn construct_prefix_orderings( /// current projection expression. /// /// # Example -/// If `dependences` is `a + b ASC` and the dependency map holds dependencies +/// If `dependencies` is `a + b ASC` and the dependency map holds dependencies /// * `a ASC` --> `[c ASC]` /// * `b ASC` --> `[d DESC]`, /// diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 938d775a2ad1..2ab53b214d7f 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -404,7 +404,7 @@ impl PhysicalExpr for BinaryExpr { if self.op.eq(&Operator::And) { if interval.eq(&Interval::CERTAINLY_TRUE) { // A certainly true logical conjunction can only derive from possibly - // true operands. Otherwise, we prove infeasability. + // true operands. Otherwise, we prove infeasibility. Ok((!left_interval.eq(&Interval::CERTAINLY_FALSE) && !right_interval.eq(&Interval::CERTAINLY_FALSE)) .then(|| vec![Interval::CERTAINLY_TRUE, Interval::CERTAINLY_TRUE])) @@ -444,7 +444,7 @@ impl PhysicalExpr for BinaryExpr { } else if self.op.eq(&Operator::Or) { if interval.eq(&Interval::CERTAINLY_FALSE) { // A certainly false logical conjunction can only derive from certainly - // false operands. Otherwise, we prove infeasability. + // false operands. Otherwise, we prove infeasibility. Ok((!left_interval.eq(&Interval::CERTAINLY_TRUE) && !right_interval.eq(&Interval::CERTAINLY_TRUE)) .then(|| vec![Interval::CERTAINLY_FALSE, Interval::CERTAINLY_FALSE])) diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 5f6932f6d725..0649cbd65d34 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -42,7 +42,7 @@ use datafusion_expr::ColumnarValue; /// /// # Example: /// If the schema is `a`, `b`, `c` the `Column` for `b` would be represented by -/// index 1, since `b` is the second colum in the schema. +/// index 1, since `b` is the second column in the schema. /// /// ``` /// # use datafusion_physical_expr::expressions::Column; diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index c594f039ff2f..232f9769b056 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -113,7 +113,7 @@ mod tests { #[test] fn literal_i32() -> Result<()> { - // create an arbitrary record bacth + // create an arbitrary record batch let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); let a = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?; diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 98c0c864b9f7..eb7e1ea6282b 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -97,7 +97,7 @@ use std::sync::Arc; /// # Additional Examples /// /// A simple `FileScanExec` might produce one output stream (partition) for each -/// file (note the actual DataFusion file scaners can read individual files in +/// file (note the actual DataFusion file scanners can read individual files in /// parallel, potentially producing multiple partitions per file) /// /// Plans such as `SortPreservingMerge` produce a single output stream diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 82c718cfaca3..0ae4115de67a 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -233,7 +233,7 @@ pub fn create_physical_expr( // verify that input data types is consistent with function's `TypeSignature` data_types_with_scalar_udf(&input_expr_types, fun)?; - // Since we have arg_types, we dont need args and schema. + // Since we have arg_types, we don't need args and schema. let return_type = fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?; diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs index 2c73df7cfd7d..7afb78b8bf2e 100644 --- a/datafusion/physical-expr/src/utils/guarantee.rs +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -808,7 +808,7 @@ mod test { vec![not_in_guarantee("b", [1, 2, 3]), in_guarantee("b", [3, 4])], ); // b IN (1, 2, 3) OR b = 2 - // TODO this should be in_guarantee("b", [1, 2, 3]) but currently we don't support to anylize this kind of disjunction. Only `ColOpLit OR ColOpLit` is supported. + // TODO this should be in_guarantee("b", [1, 2, 3]) but currently we don't support to analyze this kind of disjunction. Only `ColOpLit OR ColOpLit` is supported. test_analyze( col("b") .in_list(vec![lit(1), lit(2), lit(3)], false) diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index c964ca47e6a0..3454209445dc 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -32,8 +32,7 @@ rust-version = { workspace = true } workspace = true [features] -default = ["recursive-protection"] -recursive-protection = ["dep:recursive"] +recursive_protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index dffdc49adf09..a00bc4b1d571 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -41,7 +41,7 @@ impl AggregateStatistics { } impl PhysicalOptimizerRule for AggregateStatistics { - #[cfg_attr(feature = "recursive-protection", recursive::recursive)] + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn optimize( &self, plan: Arc, @@ -294,7 +294,7 @@ mod tests { let field = &fields[0]; assert_eq!(field.name(), agg.column_name()); assert_eq!(field.data_type(), &DataType::Int64); - // note that nullabiolity differs + // note that nullability differs assert_eq!( as_int64_array(batch.column(0)).unwrap().values(), @@ -377,7 +377,7 @@ mod tests { Arc::clone(&schema), )?; - // We introduce an intermediate optimization step between the partial and final aggregtator + // We introduce an intermediate optimization step between the partial and final aggregator let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); let final_agg = AggregateExec::try_new( @@ -409,7 +409,7 @@ mod tests { Arc::clone(&schema), )?; - // We introduce an intermediate optimization step between the partial and final aggregtator + // We introduce an intermediate optimization step between the partial and final aggregator let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); let final_agg = AggregateExec::try_new( diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index d5ffaad6d872..e107bb85d7b8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -44,7 +44,7 @@ use crate::PhysicalOptimizerRule; /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of /// the global requirements (ordering and distribution) across rules. /// -/// The primary usecase of this node and rule is to specify and preserve the desired output +/// The primary use case of this node and rule is to specify and preserve the desired output /// ordering and distribution the entire plan. When sending to a single client, a single partition may /// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be /// better. diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 35a79cbd91ed..8e975e10180f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -352,7 +352,7 @@ where let null_buffer = self.nulls.take_n(n); let first_remaining_offset = O::as_usize(self.offsets[n]); - // Given offests like [0, 2, 4, 5] and n = 1, we expect to get + // Given offsets like [0, 2, 4, 5] and n = 1, we expect to get // offsets [0, 2, 3]. We first create two offsets for first_n as [0, 2] and the remaining as [2, 4, 5]. // And we shift the offset starting from 0 for the remaining one, [2, 4, 5] -> [0, 2, 3]. let mut first_n_offsets = self.offsets.drain(0..n).collect::>(); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c261310f56e3..98787d740c20 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -137,7 +137,7 @@ struct SkipAggregationProbe { // ======================================================================== // STATES: // Fields changes during execution. Can be buffer, or state flags that - // influence the exeuction in parent `GroupedHashAggregateStream` + // influence the execution in parent `GroupedHashAggregateStream` // ======================================================================== /// Number of processed input rows (updated during probing) input_rows: usize, diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 1fc3280ceb16..708f006b0d39 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -173,7 +173,7 @@ impl ExecutionPlan for AnalyzeExec { ); } - // Create future that computes thefinal output + // Create future that computes the final output let start = Instant::now(); let captured_input = Arc::clone(&self.input); let captured_schema = Arc::clone(&self.schema); diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 46875fae94fc..f38876d93ec1 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -180,7 +180,7 @@ impl BatchCoalescer { /// Indicates the state of the [`BatchCoalescer`] buffer after the /// [`BatchCoalescer::push_batch()`] operation. /// -/// The caller should take diferent actions, depending on the variant returned. +/// The caller should take different actions, depending on the variant returned. pub enum CoalescerState { /// Neither the limit nor the target batch size is reached. /// diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 88b85a85a102..961d2f639897 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -457,7 +457,7 @@ pub trait DisplayAs { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; } -/// A newtype wrapper to display `T` implementing`DisplayAs` using the `Default` mode +/// A new type wrapper to display `T` implementing`DisplayAs` using the `Default` mode pub struct DefaultDisplay(pub T); impl fmt::Display for DefaultDisplay { @@ -466,7 +466,7 @@ impl fmt::Display for DefaultDisplay { } } -/// A newtype wrapper to display `T` implementing `DisplayAs` using the `Verbose` mode +/// A new type wrapper to display `T` implementing `DisplayAs` using the `Verbose` mode pub struct VerboseDisplay(pub T); impl fmt::Display for VerboseDisplay { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 09bb80734401..5f0b229ce92a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -521,7 +521,7 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { /// For unbounded streams, it also tracks whether the operator requires finite memory /// to process the stream or if memory usage could grow unbounded. /// -/// Bounedness of the output stream is based on the the boundedness of the input stream and the nature of +/// Boundedness of the output stream is based on the the boundedness of the input stream and the nature of /// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Boundedness { @@ -903,7 +903,7 @@ pub fn execute_stream_partitioned( /// and context. It then checks if there are any columns in the input that might /// violate the `not null` constraints specified in the `sink_schema`. If there are /// such columns, it wraps the resulting stream to enforce the `not null` constraints -/// by invoking the `check_not_null_contraits` function on each batch of the stream. +/// by invoking the [`check_not_null_constraints`] function on each batch of the stream. pub fn execute_input_stream( input: Arc, sink_schema: SchemaRef, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index ef70392a01b7..dabe42ee43a2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -138,13 +138,13 @@ impl JoinLeftData { } #[allow(rustdoc::private_intra_doc_links)] -/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple +/// Join execution plan: Evaluates equijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post /// join. /// /// # Join Expressions /// -/// This implementation is optimized for evaluating eqijoin predicates ( +/// This implementation is optimized for evaluating equijoin predicates ( /// ` = `) expressions, which are represented as a list of `Columns` /// in [`Self::on`]. /// @@ -198,7 +198,7 @@ impl JoinLeftData { /// /// Original build-side data Inserting build-side values into hashmap Concatenated build-side batch /// ┌───────────────────────────┐ -/// hasmap.insert(row-hash, row-idx + offset) │ idx │ +/// hashmap.insert(row-hash, row-idx + offset) │ idx │ /// ┌───────┐ │ ┌───────┐ │ /// │ Row 1 │ 1) update_hash for batch 3 with offset 0 │ │ Row 6 │ 0 │ /// Batch 1 │ │ - hashmap.insert(Row 7, idx 1) │ Batch 3 │ │ │ @@ -849,7 +849,7 @@ async fn collect_left_input( acc.2.build_mem_used.add(batch_size); acc.2.build_input_batches.add(1); acc.2.build_input_rows.add(batch.num_rows()); - // Update rowcount + // Update row count acc.1 += batch.num_rows(); // Push batch to output acc.0.push(batch); @@ -3490,7 +3490,7 @@ mod tests { Ok(()) } - /// Test for parallelised HashJoinExec with PartitionMode::CollectLeft + /// Test for parallelized HashJoinExec with PartitionMode::CollectLeft #[tokio::test] async fn test_collect_left_multiple_partitions_join() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index b8cb7b313bc1..438d9818475d 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -86,7 +86,7 @@ use futures::{Stream, StreamExt}; /// # Sorting /// /// Assumes that both the left and right input to the join are pre-sorted. It is not the -/// responisibility of this execution plan to sort the inputs. +/// responsibility of this execution plan to sort the inputs. /// /// # "Streamed" vs "Buffered" /// @@ -101,7 +101,7 @@ use futures::{Stream, StreamExt}; /// If the memory limit increases beyond the specified value and spilling is enabled, /// buffered batches could be spilled to disk. If spilling is disabled, the execution /// will fail under the same conditions. Multiple record batches of buffered could currently reside -/// in memory/disk during the exectution. The number of buffered batches residing in +/// in memory/disk during the execution. The number of buffered batches residing in /// memory/disk depends on the number of rows of buffered input having the same value /// of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs, /// the algorithm understands when it is not needed anymore, and releases the buffered batches @@ -304,11 +304,10 @@ impl SortMergeJoinExec { let output_partitioning = symmetric_join_output_partitioning(left, right, &join_type); - // TODO: Emission type may be incremental if the input is sorted PlanProperties::new( eq_properties, output_partitioning, - EmissionType::Final, + EmissionType::Incremental, boundedness_from_children([left, right]), ) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 0366c9fa5e46..d792e143046c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1645,7 +1645,7 @@ macro_rules! handle_state { /// Represents the result of a stateful operation. /// -/// This enumueration indicates whether the state produced a result that is +/// This enumeration indicates whether the state produced a result that is /// ready for use (`Ready`) or if the operation requires continuation (`Continue`). /// /// Variants: diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 2037ddb70c2d..dbda0a310ce5 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -50,7 +50,7 @@ pub struct MetricBuilder<'a> { /// optional partition number partition: Option, - /// arbitrary name=value pairs identifiying this metric + /// arbitrary name=value pairs identifying this metric labels: Vec