Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/external_parque…
Browse files Browse the repository at this point in the history
…t_index
  • Loading branch information
alamb committed May 29, 2024
2 parents 230d785 + 2796e01 commit 472f3be
Show file tree
Hide file tree
Showing 20 changed files with 666 additions and 190 deletions.
10 changes: 4 additions & 6 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ all(default): Data/Run/Compare for all benchmarks
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table
tpch10_mem: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
Expand Down Expand Up @@ -243,9 +243,7 @@ main() {
echo "Done"
;;
compare)
BRANCH1=$1
BRANCH2=$2
compare_benchmarks
compare_benchmarks "$ARG2" "$ARG3"
;;
"")
usage
Expand Down Expand Up @@ -446,8 +444,8 @@ run_clickbench_extended() {

compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="${ARG2}"
BRANCH2="${ARG3}"
BRANCH1="$1"
BRANCH2="$2"
if [ -z "$BRANCH1" ] ; then
echo "<branch1> not specified. Available branches:"
ls -1 "${BASE_RESULTS_DIR}"
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,9 @@ impl DataFrame {
}

/// Return a reference to the unoptimized [`LogicalPlan`] that comprises
/// this DataFrame. See [`Self::into_unoptimized_plan`] for more details.
/// this DataFrame.
///
/// See [`Self::into_unoptimized_plan`] for more details.
pub fn logical_plan(&self) -> &LogicalPlan {
&self.plan
}
Expand All @@ -1052,6 +1054,9 @@ impl DataFrame {
/// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
/// consequently subsequent operations may take place against a different
/// state (e.g. a different value of `now()`)
///
/// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
/// corresponding [`SessionState`].
pub fn into_unoptimized_plan(self) -> LogicalPlan {
self.plan
}
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ macro_rules! get_statistic {
Some(DataType::Int16) => {
Some(ScalarValue::Int16(Some((*s.$func()).try_into().unwrap())))
}
Some(DataType::UInt8) => {
Some(ScalarValue::UInt8(Some((*s.$func()).try_into().unwrap())))
}
Some(DataType::UInt16) => {
Some(ScalarValue::UInt16(Some((*s.$func()).try_into().unwrap())))
}
Some(DataType::UInt32) => {
Some(ScalarValue::UInt32(Some((*s.$func()) as u32)))
}
Some(DataType::Date32) => {
Some(ScalarValue::Date32(Some(*s.$func())))
}
Expand All @@ -100,6 +109,9 @@ macro_rules! get_statistic {
*scale,
))
}
Some(DataType::UInt64) => {
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
Expand Down
60 changes: 25 additions & 35 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use arrow::datatypes::{Date32Type, Date64Type};
use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array,
Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -703,8 +704,6 @@ async fn test_dates_64_diff_rg_sizes() {
.run();
}

// BUG:
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint() {
// This creates a parquet files of 4 columns named "u8", "u16", "u32", "u64"
Expand All @@ -719,48 +718,40 @@ async fn test_uint() {
row_per_group: 4,
};

// u8
// BUG: expect UInt8Array but returns Int32Array
Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array
expected_min: Arc::new(UInt8Array::from(vec![0, 1, 4, 7, 251])),
expected_max: Arc::new(UInt8Array::from(vec![3, 4, 6, 250, 254])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
column_name: "u8",
}
.run();

// u16
// BUG: expect UInt16Array but returns Int32Array
Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array
expected_min: Arc::new(UInt16Array::from(vec![0, 1, 4, 7, 251])),
expected_max: Arc::new(UInt16Array::from(vec![3, 4, 6, 250, 254])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
column_name: "u16",
}
.run();

// u32
// BUG: expect UInt32Array but returns Int32Array
Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array
expected_min: Arc::new(UInt32Array::from(vec![0, 1, 4, 7, 251])),
expected_max: Arc::new(UInt32Array::from(vec![3, 4, 6, 250, 254])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
column_name: "u32",
}
.run();

// u64
// BUG: expect UInt64rray but returns Int64Array
Test {
reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array
expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array
expected_min: Arc::new(UInt64Array::from(vec![0, 1, 4, 7, 251])),
expected_max: Arc::new(UInt64Array::from(vec![3, 4, 6, 250, 254])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
column_name: "u64",
Expand Down Expand Up @@ -788,8 +779,6 @@ async fn test_int32_range() {
.run();
}

// BUG: not convert UInt32Array to Int32Array
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint32_range() {
// This creates a parquet file of 1 column "u"
Expand All @@ -801,8 +790,8 @@ async fn test_uint32_range() {

Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![0])), // should be UInt32Array
expected_max: Arc::new(Int32Array::from(vec![300000])), // should be UInt32Array
expected_min: Arc::new(UInt32Array::from(vec![0])),
expected_max: Arc::new(UInt32Array::from(vec![300000])),
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![4]),
column_name: "u",
Expand All @@ -820,44 +809,45 @@ async fn test_numeric_limits_unsigned() {

Test {
reader: reader.build().await,
expected_min: Arc::new(Int8Array::from(vec![i8::MIN, -100])),
expected_max: Arc::new(Int8Array::from(vec![100, i8::MAX])),
expected_min: Arc::new(UInt8Array::from(vec![u8::MIN, 100])),
expected_max: Arc::new(UInt8Array::from(vec![100, u8::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i8",
column_name: "u8",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int16Array::from(vec![i16::MIN, -100])),
expected_max: Arc::new(Int16Array::from(vec![100, i16::MAX])),
expected_min: Arc::new(UInt16Array::from(vec![u16::MIN, 100])),
expected_max: Arc::new(UInt16Array::from(vec![100, u16::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i16",
column_name: "u16",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![i32::MIN, -100])),
expected_max: Arc::new(Int32Array::from(vec![100, i32::MAX])),
expected_min: Arc::new(UInt32Array::from(vec![u32::MIN, 100])),
expected_max: Arc::new(UInt32Array::from(vec![100, u32::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i32",
column_name: "u32",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![i64::MIN, -100])),
expected_max: Arc::new(Int64Array::from(vec![100, i64::MAX])),
expected_min: Arc::new(UInt64Array::from(vec![u64::MIN, 100])),
expected_max: Arc::new(UInt64Array::from(vec![100, u64::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i64",
column_name: "u64",
}
.run();
}

#[tokio::test]
async fn test_numeric_limits_signed() {
// file has 7 rows, 2 row groups: one with 5 rows, one with 2 rows.
Expand Down
57 changes: 57 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ use sqlparser::ast::NullTreatment;
/// See [`ExprSchemable::get_type`] to access the [`DataType`] and nullability
/// of an `Expr`.
///
/// # Visiting and Rewriting `Expr`s
///
/// The `Expr` struct implements the [`TreeNode`] trait for walking and
/// rewriting expressions. For example [`TreeNode::apply`] recursively visits an
/// `Expr` and [`TreeNode::transform`] can be used to rewrite an expression. See
/// the examples below and [`TreeNode`] for more information.
///
/// # Examples
///
/// ## Column references and literals
Expand Down Expand Up @@ -156,6 +163,56 @@ use sqlparser::ast::NullTreatment;
/// Expr::from(Column::from_qualified_name("t1.c2")),
/// ]);
/// ```
///
/// # Visiting and Rewriting `Expr`s
///
/// Here is an example that finds all literals in an `Expr` tree:
/// ```
/// # use std::collections::{HashSet};
/// use datafusion_common::ScalarValue;
/// # use datafusion_expr::{col, Expr, lit};
/// use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
/// // Expression a = 5 AND b = 6
/// let expr = col("a").eq(lit(5)) & col("b").eq(lit(6));
/// // find all literals in a HashMap
/// let mut scalars = HashSet::new();
/// // apply recursively visits all nodes in the expression tree
/// expr.apply(|e| {
/// if let Expr::Literal(scalar) = e {
/// scalars.insert(scalar);
/// }
/// // The return value controls whether to continue visiting the tree
/// Ok(TreeNodeRecursion::Continue)
/// }).unwrap();;
/// // All subtrees have been visited and literals found
/// assert_eq!(scalars.len(), 2);
/// assert!(scalars.contains(&ScalarValue::Int32(Some(5))));
/// assert!(scalars.contains(&ScalarValue::Int32(Some(6))));
/// ```
///
/// Rewrite an expression, replacing references to column "a" in an
/// to the literal `42`:
///
/// ```
/// # use datafusion_common::tree_node::{Transformed, TreeNode};
/// # use datafusion_expr::{col, Expr, lit};
/// // expression a = 5 AND b = 6
/// let expr = col("a").eq(lit(5)).and(col("b").eq(lit(6)));
/// // rewrite all references to column "a" to the literal 42
/// let rewritten = expr.transform(|e| {
/// if let Expr::Column(c) = &e {
/// if &c.name == "a" {
/// // return Transformed::yes to indicate the node was changed
/// return Ok(Transformed::yes(lit(42)))
/// }
/// }
/// // return Transformed::no to indicate the node was not changed
/// Ok(Transformed::no(e))
/// }).unwrap();
/// // The expression has been rewritten
/// assert!(rewritten.transformed);
/// // to 42 = 5 AND b = 6
/// assert_eq!(rewritten.data, lit(42).eq(lit(5)).and(col("b").eq(lit(6))));
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
Expand Down
16 changes: 6 additions & 10 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,12 @@ pub const UNNAMED_TABLE: &str = "?table?";
/// // SELECT last_name
/// // FROM employees
/// // WHERE salary < 1000
/// let plan = table_scan(
/// Some("employee"),
/// &employee_schema(),
/// None,
/// )?
/// // Keep only rows where salary < 1000
/// .filter(col("salary").lt_eq(lit(1000)))?
/// // only show "last_name" in the final results
/// .project(vec![col("last_name")])?
/// .build()?;
/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
/// // Keep only rows where salary < 1000
/// .filter(col("salary").lt(lit(1000)))?
/// // only show "last_name" in the final results
/// .project(vec![col("last_name")])?
/// .build()?;
///
/// # Ok(())
/// # }
Expand Down
Loading

0 comments on commit 472f3be

Please sign in to comment.