Skip to content

Commit

Permalink
move accumulator and columnar value
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Feb 8, 2022
1 parent f2615af commit 71f5c6e
Show file tree
Hide file tree
Showing 18 changed files with 1,108 additions and 974 deletions.
1 change: 1 addition & 0 deletions datafusion-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ path = "src/lib.rs"
datafusion-common = { path = "../datafusion-common", version = "6.0.0" }
arrow = { version = "8.0.0", features = ["prettyprint"] }
sqlparser = "0.13"
ahash = { version = "0.7", default-features = false }
44 changes: 44 additions & 0 deletions datafusion-expr/src/accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use datafusion_common::{Result, ScalarValue};
use std::fmt::Debug;

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values.
///
/// An accumulator knows how to:
/// * update its state from inputs via `update_batch`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge_batch`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function should return a vector
// of two values, sum and n.
fn state(&self) -> Result<Vec<ScalarValue>>;

/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;

/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
}
60 changes: 60 additions & 0 deletions datafusion-expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use std::sync::Arc;

/// Represents the result from an expression
#[derive(Clone)]
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}

impl ColumnarValue {
pub fn data_type(&self) -> DataType {
match self {
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
}
}

/// Convert a columnar value into an ArrayRef
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}
}

/// null columnar values are implemented as a null array in order to pass batch
/// num_rows
pub type NullColumnarValue = ColumnarValue;

impl From<&RecordBatch> for NullColumnarValue {
fn from(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
}
Loading

0 comments on commit 71f5c6e

Please sign in to comment.