Skip to content

Commit

Permalink
fix collect_set/collect_list incorrect spill. (#717)
Browse files Browse the repository at this point in the history
fix parsing parquet int96 to timestamp error.

fix compatibility of casting str to int.

fix Incorrect CASE WHEN + ELSE NULL behavior (apache/datafusion#13885).

use number of physical cores as tokio parallelism, removing blaze.tokio.num.worker.threads.

sort multiple batches with interleaver in repartitioner.

use in-place radix sort.

use sync channel in ipc reading.

Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Dec 24, 2024
1 parent 64f4b5e commit 43f4e58
Show file tree
Hide file tree
Showing 19 changed files with 1,011 additions and 584 deletions.
442 changes: 271 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,27 @@ serde_json = { version = "1.0.96" }

[patch.crates-io]
# datafusion: branch=v42-blaze
datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "7833d7d"}
datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "e9e2128a7"}
orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "0d798f8"}

# arrow: branch=v53-blaze
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "f34f7eb3c2"}

# serde_json: branch=v1.0.96-blaze
serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" }
1 change: 0 additions & 1 deletion native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ macro_rules! define_conf {

define_conf!(IntConf, BATCH_SIZE);
define_conf!(DoubleConf, MEMORY_FRACTION);
define_conf!(IntConf, TOKIO_NUM_WORKER_THREADS);
define_conf!(BooleanConf, SMJ_INEQUALITY_JOIN_ENABLE);
define_conf!(BooleanConf, CASE_CONVERT_FUNCTIONS_ENABLE);
define_conf!(BooleanConf, INPUT_BATCH_STATISTICS_ENABLE);
Expand Down
1 change: 1 addition & 0 deletions native-engine/blaze/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ once_cell = "1.20.2"
panic-message = "0.3.0"
paste = "1.0.15"
prost = "0.13.4"
raw-cpuid = "11.2.0"
tokio = "=1.42.0"

[target.'cfg(not(windows))'.dependencies]
Expand Down
27 changes: 21 additions & 6 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow::{
record_batch::RecordBatch,
};
use blaze_jni_bridge::{
conf::{IntConf, TOKIO_NUM_WORKER_THREADS},
is_task_running,
jni_bridge::JavaClasses,
jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred,
jni_new_global_ref, jni_new_object, jni_new_string,
is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_convert_byte_array,
jni_exception_check, jni_exception_occurred, jni_new_global_ref, jni_new_object,
jni_new_string,
};
use blaze_serde::protobuf::TaskDefinition;
use datafusion::{
Expand All @@ -49,6 +47,7 @@ use datafusion_ext_plans::{
use futures::{FutureExt, StreamExt};
use jni::objects::{GlobalRef, JObject};
use prost::Message;
use raw_cpuid::CpuId;
use tokio::{runtime::Runtime, task::JoinHandle};

use crate::{
Expand Down Expand Up @@ -95,13 +94,29 @@ impl NativeExecutionRuntime {
&ExecutionPlanMetricsSet::new(),
);

// determine number of tokio worker threads
// use the real number of available physical cores
let default_parallelism = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1);
let has_htt = CpuId::new()
.get_feature_info()
.map(|info| info.has_htt())
.unwrap_or(false);
let mut num_worker_threads = if has_htt {
default_parallelism / 2
} else {
default_parallelism
};
num_worker_threads = num_worker_threads.max(1);

// create tokio runtime
// propagate classloader and task context to spawned children threads
let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?;
let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?;
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}"))
.worker_threads(TOKIO_NUM_WORKER_THREADS.value()? as usize)
.worker_threads(num_worker_threads)
.on_thread_start(move || {
let classloader = JavaClasses::get().classloader;
let _ = jni_call_static!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct RadixTournamentTree<T> {
#[allow(clippy::len_without_is_empty)]
impl<T: KeyForRadixTournamentTree> RadixTournamentTree<T> {
pub fn new(values: Vec<T>, num_keys: usize) -> Self {
let num_keys = num_keys + 1; // avoid overflow
let num_values = values.len();
let mut tree = unsafe {
// safety:
Expand Down
121 changes: 89 additions & 32 deletions native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::vec::IntoIter;
use crate::unchecked;

use radsort::Key;
/// Perform radix sort on a single array
///
/// - array: the array to be sorted
/// - counts: the counters to be used for counting, must be initialized to 0.
/// will be filled with the number of elements in each bucket after sorting.
/// - key: a function to extract the key from the array element
pub fn radix_sort_by_key<T>(array: &mut [T], counts: &mut [usize], key: impl Fn(&T) -> usize) {
#[derive(Default, Clone, Copy)]
struct Part {
cur: usize,
end: usize,
}

const STD_SORT_LIMIT: usize = 4096;
let num_keys = counts.len();
let mut counts = unchecked!(counts);
let mut parts = unchecked!(vec![Part::default(); num_keys]);

pub fn radix_sort_unstable(array: &mut [impl Key + Ord]) {
radix_sort_unstable_by_key(array, |v| *v);
}
// count
array.iter().for_each(|item| counts[key(item)] += 1);

// construct parts
let mut beg = 0;
for (idx, count) in counts.iter().enumerate() {
if *count > 0 {
parts[idx] = Part {
cur: beg,
end: beg + count,
};
beg += count;
}
}

pub fn radix_sort_unstable_by_key<T, K: Key + Ord>(array: &mut [T], key: impl Fn(&T) -> K) {
if array.len() < STD_SORT_LIMIT {
array.sort_unstable_by_key(key);
} else {
radsort::sort_by_key(array, key);
// reorganize each partition
let mut inexhausted_part_indices = unchecked!(vec![0; num_keys]);
for i in 0..num_keys {
inexhausted_part_indices[i] = i;
}
while {
inexhausted_part_indices.retain(|&i| parts[i].cur < parts[i].end);
inexhausted_part_indices.len() > 1
} {
for &part_idx in inexhausted_part_indices.iter() {
let cur_part = &parts[part_idx];
let cur = cur_part.cur;
let end = cur_part.end;
for item_idx in cur..end {
let target_part_idx = key(&array[item_idx]);
let target_part = &mut parts[target_part_idx];
unsafe {
// safety: skip bound check
array.swap_unchecked(item_idx, target_part.cur);
}
target_part.cur += 1;
}
}
}
}

pub trait RadixSortIterExt: Iterator {
fn radix_sorted_unstable(self) -> IntoIter<Self::Item>
where
Self: Sized,
Self::Item: Key + Ord,
{
let mut vec: Vec<Self::Item> = self.collect();
radix_sort_unstable(&mut vec);
vec.into_iter()
#[cfg(test)]
mod test {
use rand::Rng;

use super::*;

#[test]
fn fuzzytest_u16_small() {
for n in 0..1000 {
let mut array = vec![];
for _ in 0..n {
array.push(rand::thread_rng().gen::<u16>());
}

let mut array1 = array.clone();
radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize);

let mut array2 = array.clone();
array2.sort_unstable();

assert_eq!(array1, array2);
}
}

fn radix_sorted_unstable_by_key<K: Key + Ord>(
self,
key: impl Fn(&Self::Item) -> K,
) -> IntoIter<Self::Item>
where
Self: Sized,
{
let mut vec: Vec<Self::Item> = self.collect();
radix_sort_unstable_by_key(&mut vec, key);
vec.into_iter()
#[test]
fn fuzzytest_u16_1m() {
let mut array = vec![];
for _ in 0..1000000 {
array.push(rand::thread_rng().gen::<u16>());
}

let mut array1 = array.clone();
radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize);

let mut array2 = array.clone();
array2.sort_unstable();

assert_eq!(array1, array2);
}
}

impl<T, I: Iterator<Item = T>> RadixSortIterExt for I {}
Loading

0 comments on commit 43f4e58

Please sign in to comment.