Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/schema_adapter_…
Browse files Browse the repository at this point in the history
…factory_example
  • Loading branch information
alamb committed Oct 24, 2024
2 parents b065efe + 2322933 commit f9242e5
Show file tree
Hide file tree
Showing 114 changed files with 2,272 additions and 1,507 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,8 @@ For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81
If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)

## DataFusion API evolution policy

Public methods in Apache DataFusion are subject to evolve as part of the API lifecycle.
Deprecated methods will be phased out in accordance with the [policy](https://datafusion.apache.org/library-user-guide/api-health.html), ensuring the API is stable and healthy.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -151,7 +151,7 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
}

// any join we count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
46 changes: 31 additions & 15 deletions datafusion/common/src/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,17 @@ impl<'n, N: HashNode> Identifier<'n, N> {
/// ```
type IdArray<'n, N> = Vec<(usize, Option<Identifier<'n, N>>)>;

/// A map that contains the number of normal and conditional occurrences of [`TreeNode`]s
/// by their identifiers.
type NodeStats<'n, N> = HashMap<Identifier<'n, N>, (usize, usize)>;
#[derive(PartialEq, Eq)]
/// How many times a node is evaluated. A node can be considered common if evaluated
/// surely at least 2 times or surely only once but also conditionally.
enum NodeEvaluation {
SurelyOnce,
ConditionallyAtLeastOnce,
Common,
}

/// A map that contains the evaluation stats of [`TreeNode`]s by their identifiers.
type NodeStats<'n, N> = HashMap<Identifier<'n, N>, NodeEvaluation>;

/// A map that contains the common [`TreeNode`]s and their alias by their identifiers,
/// extracted during the second, rewriting traversal.
Expand Down Expand Up @@ -331,16 +339,24 @@ impl<'n, N: TreeNode + HashNode + Eq, C: CSEController<Node = N>> TreeNodeVisito
self.id_array[down_index].0 = self.up_index;
if is_valid && !self.controller.is_ignored(node) {
self.id_array[down_index].1 = Some(node_id);
let (count, conditional_count) =
self.node_stats.entry(node_id).or_insert((0, 0));
if self.conditional {
*conditional_count += 1;
} else {
*count += 1;
}
if *count > 1 || (*count == 1 && *conditional_count > 0) {
self.found_common = true;
}
self.node_stats
.entry(node_id)
.and_modify(|evaluation| {
if *evaluation == NodeEvaluation::SurelyOnce
|| *evaluation == NodeEvaluation::ConditionallyAtLeastOnce
&& !self.conditional
{
*evaluation = NodeEvaluation::Common;
self.found_common = true;
}
})
.or_insert_with(|| {
if self.conditional {
NodeEvaluation::ConditionallyAtLeastOnce
} else {
NodeEvaluation::SurelyOnce
}
});
}
self.visit_stack
.push(VisitRecord::NodeItem(node_id, is_valid));
Expand Down Expand Up @@ -383,8 +399,8 @@ impl<N: TreeNode + Eq, C: CSEController<Node = N>> TreeNodeRewriter

// Handle nodes with identifiers only
if let Some(node_id) = node_id {
let (count, conditional_count) = self.node_stats.get(&node_id).unwrap();
if *count > 1 || *count == 1 && *conditional_count > 0 {
let evaluation = self.node_stats.get(&node_id).unwrap();
if *evaluation == NodeEvaluation::Common {
// step index to skip all sub-node (which has smaller series number).
while self.down_index < self.id_array.len()
&& self.id_array[self.down_index].0 < up_index
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ impl DFSchema {
None => self_unqualified_names.contains(field.name().as_str()),
};
if !duplicated_field {
// self.inner.fields.push(field.clone());
schema_builder.push(Arc::clone(field));
qualifiers.push(qualifier.cloned());
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ fn hash_array_primitive<T>(
hashes_buffer: &mut [u64],
rehash: bool,
) where
T: ArrowPrimitiveType,
<T as arrow_array::ArrowPrimitiveType>::Native: HashValue,
T: ArrowPrimitiveType<Native: HashValue>,
{
assert_eq!(
hashes_buffer.len(),
Expand Down
24 changes: 4 additions & 20 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ pub mod proxy;
pub mod string_utils;

use crate::error::{_internal_datafusion_err, _internal_err};
use crate::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::ArrayRef;
use arrow::buffer::OffsetBuffer;
use arrow::compute::{partition, take_arrays, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef};
use arrow_array::cast::AsArray;
use arrow_array::{
Array, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
RecordBatchOptions,
};
use arrow_schema::DataType;
use sqlparser::ast::Ident;
Expand Down Expand Up @@ -92,20 +90,6 @@ pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValu
.collect()
}

/// Construct a new RecordBatch from the rows of the `record_batch` at the `indices`.
pub fn get_record_batch_at_indices(
record_batch: &RecordBatch,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<RecordBatch> {
let new_columns = take_arrays(record_batch.columns(), indices, None)?;
RecordBatch::try_new_with_options(
record_batch.schema(),
new_columns,
&RecordBatchOptions::new().with_row_count(Some(indices.len())),
)
.map_err(|e| arrow_datafusion_err!(e))
}

/// This function compares two tuples depending on the given sort options.
pub fn compare_rows(
x: &[ScalarValue],
Expand Down
75 changes: 70 additions & 5 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {

fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc<MemTable> {
let schema = Arc::new(create_schema(column_prefix, num_columns));
MemTable::try_new(schema, vec![]).map(Arc::new).unwrap()
MemTable::try_new(schema, vec![vec![]])
.map(Arc::new)
.unwrap()
}

fn create_context() -> SessionContext {
Expand Down Expand Up @@ -158,6 +160,71 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// Benchmark for Physical Planning Joins
c.bench_function("physical_join_consider_sort", |b| {
b.iter(|| {
physical_plan(
&ctx,
"SELECT t1.a7, t2.b8 \
FROM t1, t2 WHERE a7 = b7 \
ORDER BY a7",
);
});
});

c.bench_function("physical_theta_join_consider_sort", |b| {
b.iter(|| {
physical_plan(
&ctx,
"SELECT t1.a7, t2.b8 \
FROM t1, t2 WHERE a7 < b7 \
ORDER BY a7",
);
});
});

c.bench_function("physical_many_self_joins", |b| {
b.iter(|| {
physical_plan(
&ctx,
"SELECT ta.a9, tb.a10, tc.a11, td.a12, te.a13, tf.a14 \
FROM t1 AS ta, t1 AS tb, t1 AS tc, t1 AS td, t1 AS te, t1 AS tf \
WHERE ta.a9 = tb.a10 AND tb.a10 = tc.a11 AND tc.a11 = td.a12 AND \
td.a12 = te.a13 AND te.a13 = tf.a14",
);
});
});

c.bench_function("physical_unnest_to_join", |b| {
b.iter(|| {
physical_plan(
&ctx,
"SELECT t1.a7 \
FROM t1 WHERE a7 = (SELECT b8 FROM t2)",
);
});
});

c.bench_function("physical_intersection", |b| {
b.iter(|| {
physical_plan(
&ctx,
"SELECT t1.a7 FROM t1 \
INTERSECT SELECT t2.b8 FROM t2",
);
});
});
// these two queries should be equivalent
c.bench_function("physical_join_distinct", |b| {
b.iter(|| {
logical_plan(
&ctx,
"SELECT DISTINCT t1.a7 \
FROM t1, t2 WHERE t1.a7 = t2.b8",
);
});
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down Expand Up @@ -203,10 +270,8 @@ fn criterion_benchmark(c: &mut Criterion) {

let tpcds_ctx = register_defs(SessionContext::new(), tpcds_schemas());

// 10, 35: Physical plan does not support logical expression Exists(<subquery>)
// 45: Physical plan does not support logical expression (<subquery>)
// 41: Optimizing disjunctions not supported
let ignored = [10, 35, 41, 45];
// 41: check_analyzed_plan: Correlated column is not allowed in predicate
let ignored = [41];

let raw_tpcds_sql_queries = (1..100)
.filter(|q| !ignored.contains(q))
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/catalog_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ pub use datafusion_sql::{ResolvedTableReference, TableReference};
use std::collections::BTreeSet;
use std::ops::ControlFlow;

/// See [`CatalogProviderList`]
#[deprecated(since = "35.0.0", note = "use [`CatalogProviderList`] instead")]
pub trait CatalogList: CatalogProviderList {}

/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
///
Expand Down
25 changes: 1 addition & 24 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,32 +373,9 @@ impl DataFrame {
self.select(expr)
}

/// Expand each list element of a column to multiple rows.
#[deprecated(since = "37.0.0", note = "use unnest_columns instead")]
pub fn unnest_column(self, column: &str) -> Result<DataFrame> {
self.unnest_columns(&[column])
}

/// Expand each list element of a column to multiple rows, with
/// behavior controlled by [`UnnestOptions`].
///
/// Please see the documentation on [`UnnestOptions`] for more
/// details about the meaning of unnest.
#[deprecated(since = "37.0.0", note = "use unnest_columns_with_options instead")]
pub fn unnest_column_with_options(
self,
column: &str,
options: UnnestOptions,
) -> Result<DataFrame> {
self.unnest_columns_with_options(&[column], options)
}

/// Expand multiple list/struct columns into a set of rows and new columns.
///
/// See also:
///
/// 1. [`UnnestOptions`] documentation for the behavior of `unnest`
/// 2. [`Self::unnest_column_with_options`]
/// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
///
/// # Example
/// ```
Expand Down
29 changes: 18 additions & 11 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_expr::{
Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Window,
Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window,
};
use crate::logical_expr::{
Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition,
UserDefinedLogicalNode,
};
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::{create_physical_expr, create_physical_exprs};
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::analyze::AnalyzeExec;
Expand Down Expand Up @@ -78,8 +77,8 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, Extension, FetchType, Filter, JoinType, RecursiveQuery,
SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -796,8 +795,20 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::SubqueryAlias(_) => children.one()?,
LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
LogicalPlan::Limit(limit) => {
let input = children.one()?;
let SkipType::Literal(skip) = limit.get_skip_type()? else {
return not_impl_err!(
"Unsupported OFFSET expression: {:?}",
limit.skip
);
};
let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
return not_impl_err!(
"Unsupported LIMIT expression: {:?}",
limit.fetch
);
};

// GlobalLimitExec requires a single partition for input
let input = if input.output_partitioning().partition_count() == 1 {
Expand All @@ -806,13 +817,13 @@ impl DefaultPhysicalPlanner {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
// a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
if let Some(fetch) = fetch {
Arc::new(LocalLimitExec::new(input, *fetch + skip))
Arc::new(LocalLimitExec::new(input, fetch + skip))
} else {
input
}
};

Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
Arc::new(GlobalLimitExec::new(input, skip, fetch))
}
LogicalPlan::Unnest(Unnest {
list_type_columns,
Expand Down Expand Up @@ -1116,10 +1127,6 @@ impl DefaultPhysicalPlanner {
join
}
}
LogicalPlan::CrossJoin(_) => {
let [left, right] = children.two()?;
Arc::new(CrossJoinExec::new(left, right))
}
LogicalPlan::RecursiveQuery(RecursiveQuery {
name, is_distinct, ..
}) => {
Expand Down
Loading

0 comments on commit f9242e5

Please sign in to comment.