Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'apache/main' into alamb/new_date_part_t…
Browse files Browse the repository at this point in the history
…ests
alamb committed Dec 12, 2024
2 parents 5c8de1f + 98c483e commit 2406457
Showing 24 changed files with 1,022 additions and 240 deletions.
241 changes: 110 additions & 131 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
apache-avro = { version = "0.16", default-features = false, features = [
apache-avro = { version = "0.17", default-features = false, features = [
"bzip",
"snappy",
"xz",
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ unicode_expressions = [
]

[dependencies]
apache-avro = { version = "0.16", optional = true }
apache-avro = { version = "0.17", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-ipc = { workspace = true }
Original file line number Diff line number Diff line change
@@ -138,7 +138,11 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
}
AvroSchema::Array(schema) => {
let sub_parent_field_name = format!("{}.element", parent_field_name);
Self::child_schema_lookup(&sub_parent_field_name, schema, schema_lookup)?;
Self::child_schema_lookup(
&sub_parent_field_name,
&schema.items,
schema_lookup,
)?;
}
_ => (),
}
14 changes: 11 additions & 3 deletions datafusion/core/src/datasource/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
@@ -73,11 +73,15 @@ fn schema_to_field_with_props(
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Arc::new(
schema_to_field_with_props(item_schema, Some("element"), false, None)?,
schema_to_field_with_props(&item_schema.items, Some("element"), false, None)?,
)),
AvroSchema::Map(value_schema) => {
let value_field =
schema_to_field_with_props(value_schema, Some("value"), false, None)?;
let value_field = schema_to_field_with_props(
&value_schema.types,
Some("value"),
false,
None,
)?;
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(value_field.data_type().clone()),
@@ -144,14 +148,17 @@ fn schema_to_field_with_props(
AvroSchema::Decimal(DecimalSchema {
precision, scale, ..
}) => DataType::Decimal128(*precision as u8, *scale as i8),
AvroSchema::BigDecimal => DataType::LargeBinary,
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
AvroSchema::TimestampNanos => DataType::Timestamp(TimeUnit::Nanosecond, None),
AvroSchema::LocalTimestampMillis => todo!(),
AvroSchema::LocalTimestampMicros => todo!(),
AvroSchema::LocalTimestampNanos => todo!(),
AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
};

@@ -371,6 +378,7 @@ mod test {
aliases: Some(vec![alias("foofixed"), alias("barfixed")]),
size: 1,
doc: None,
default: None,
attributes: Default::default(),
});
let props = external_props(&fixed_schema);
Original file line number Diff line number Diff line change
@@ -371,6 +371,75 @@ pub fn accumulate<T, F>(
}
}

/// Accumulates with multiple accumulate(value) columns. (e.g. `corr(c1, c2)`)
///
/// This method assumes that for any input record index, if any of the value column
/// is null, or it's filtered out by `opt_filter`, then the record would be ignored.
/// (won't be accumulated by `value_fn`)
///
/// # Arguments
///
/// * `group_indices` - To which groups do the rows in `value_columns` belong
/// * `value_columns` - The input arrays to accumulate
/// * `opt_filter` - Optional filter array. If present, only rows where filter is `Some(true)` are included
/// * `value_fn` - Callback function for each valid row, with parameters:
/// * `group_idx`: The group index for the current row
/// * `batch_idx`: The index of the current row in the input arrays
/// * `columns`: Reference to all input arrays for accessing values
pub fn accumulate_multiple<T, F>(
group_indices: &[usize],
value_columns: &[&PrimitiveArray<T>],
opt_filter: Option<&BooleanArray>,
mut value_fn: F,
) where
T: ArrowPrimitiveType + Send,
F: FnMut(usize, usize, &[&PrimitiveArray<T>]) + Send,
{
// Calculate `valid_indices` to accumulate, non-valid indices are ignored.
// `valid_indices` is a bit mask corresponding to the `group_indices`. An index
// is considered valid if:
// 1. All columns are non-null at this index.
// 2. Not filtered out by `opt_filter`

// Take AND from all null buffers of `value_columns`.
let combined_nulls = value_columns
.iter()
.map(|arr| arr.logical_nulls())
.fold(None, |acc, nulls| {
NullBuffer::union(acc.as_ref(), nulls.as_ref())
});

// Take AND from previous combined nulls and `opt_filter`.
let valid_indices = match (combined_nulls, opt_filter) {
(None, None) => None,
(None, Some(filter)) => Some(filter.clone()),
(Some(nulls), None) => Some(BooleanArray::new(nulls.inner().clone(), None)),
(Some(nulls), Some(filter)) => {
let combined = nulls.inner() & filter.values();
Some(BooleanArray::new(combined, None))
}
};

for col in value_columns.iter() {
debug_assert_eq!(col.len(), group_indices.len());
}

match valid_indices {
None => {
for (batch_idx, &group_idx) in group_indices.iter().enumerate() {
value_fn(group_idx, batch_idx, value_columns);
}
}
Some(valid_indices) => {
for (batch_idx, &group_idx) in group_indices.iter().enumerate() {
if valid_indices.value(batch_idx) {
value_fn(group_idx, batch_idx, value_columns);
}
}
}
}
}

/// This function is called to update the accumulator state per row
/// when the value is not needed (e.g. COUNT)
///
@@ -528,7 +597,7 @@ fn initialize_builder(
mod test {
use super::*;

use arrow::array::UInt32Array;
use arrow::array::{Int32Array, UInt32Array};
use rand::{rngs::ThreadRng, Rng};
use std::collections::HashSet;

@@ -940,4 +1009,107 @@ mod test {
.collect()
}
}

#[test]
fn test_accumulate_multiple_no_nulls_no_filter() {
let group_indices = vec![0, 1, 0, 1];
let values1 = Int32Array::from(vec![1, 2, 3, 4]);
let values2 = Int32Array::from(vec![10, 20, 30, 40]);
let value_columns = [values1, values2];

let mut accumulated = vec![];
accumulate_multiple(
&group_indices,
&value_columns.iter().collect::<Vec<_>>(),
None,
|group_idx, batch_idx, columns| {
let values = columns.iter().map(|col| col.value(batch_idx)).collect();
accumulated.push((group_idx, values));
},
);

let expected = vec![
(0, vec![1, 10]),
(1, vec![2, 20]),
(0, vec![3, 30]),
(1, vec![4, 40]),
];
assert_eq!(accumulated, expected);
}

#[test]
fn test_accumulate_multiple_with_nulls() {
let group_indices = vec![0, 1, 0, 1];
let values1 = Int32Array::from(vec![Some(1), None, Some(3), Some(4)]);
let values2 = Int32Array::from(vec![Some(10), Some(20), None, Some(40)]);
let value_columns = [values1, values2];

let mut accumulated = vec![];
accumulate_multiple(
&group_indices,
&value_columns.iter().collect::<Vec<_>>(),
None,
|group_idx, batch_idx, columns| {
let values = columns.iter().map(|col| col.value(batch_idx)).collect();
accumulated.push((group_idx, values));
},
);

// Only rows where both columns are non-null should be accumulated
let expected = vec![(0, vec![1, 10]), (1, vec![4, 40])];
assert_eq!(accumulated, expected);
}

#[test]
fn test_accumulate_multiple_with_filter() {
let group_indices = vec![0, 1, 0, 1];
let values1 = Int32Array::from(vec![1, 2, 3, 4]);
let values2 = Int32Array::from(vec![10, 20, 30, 40]);
let value_columns = [values1, values2];

let filter = BooleanArray::from(vec![true, false, true, false]);

let mut accumulated = vec![];
accumulate_multiple(
&group_indices,
&value_columns.iter().collect::<Vec<_>>(),
Some(&filter),
|group_idx, batch_idx, columns| {
let values = columns.iter().map(|col| col.value(batch_idx)).collect();
accumulated.push((group_idx, values));
},
);

// Only rows where filter is true should be accumulated
let expected = vec![(0, vec![1, 10]), (0, vec![3, 30])];
assert_eq!(accumulated, expected);
}

#[test]
fn test_accumulate_multiple_with_nulls_and_filter() {
let group_indices = vec![0, 1, 0, 1];
let values1 = Int32Array::from(vec![Some(1), None, Some(3), Some(4)]);
let values2 = Int32Array::from(vec![Some(10), Some(20), None, Some(40)]);
let value_columns = [values1, values2];

let filter = BooleanArray::from(vec![true, true, true, false]);

let mut accumulated = vec![];
accumulate_multiple(
&group_indices,
&value_columns.iter().collect::<Vec<_>>(),
Some(&filter),
|group_idx, batch_idx, columns| {
let values = columns.iter().map(|col| col.value(batch_idx)).collect();
accumulated.push((group_idx, values));
},
);

// Only rows where both:
// 1. Filter is true
// 2. Both columns are non-null
// should be accumulated
let expected = [(0, vec![1, 10])];
assert_eq!(accumulated, expected);
}
}
Loading

0 comments on commit 2406457

Please sign in to comment.