Skip to content

Commit

Permalink
improve shuffle performance (#559)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Sep 3, 2024
1 parent d036cca commit a931b42
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 270 deletions.
4 changes: 2 additions & 2 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ pub trait DoubleConf {

pub trait StringConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<&'static str> {
fn value(&self) -> Result<String> {
let key = jni_new_string!(self.key())?;
let value = jni_get_string!(
jni_call_static!(BlazeConf.stringConf(key.as_obj()) -> JObject)?
.as_obj()
.into()
)?;
Ok(Box::leak(value.into_boxed_str()))
Ok(value)
}
}
46 changes: 26 additions & 20 deletions native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,8 @@ impl CoalesceStream {
fn coalesce(&mut self) -> Result<RecordBatch> {
// better concat_batches() implementation that releases old batch columns asap.
let schema = self.input.schema();

// collect all columns
let mut all_cols = schema.fields().iter().map(|_| vec![]).collect::<Vec<_>>();
for batch in std::mem::take(&mut self.staging_batches) {
for i in 0..all_cols.len() {
all_cols[i].push(batch.column(i).clone());
}
}

// coalesce each column
let mut coalesced_cols = vec![];
for (cols, field) in all_cols.into_iter().zip(schema.fields()) {
let dt = field.data_type();
coalesced_cols.push(coalesce_arrays_unchecked(dt, &cols));
}
let coalesced_batch = RecordBatch::try_new_with_options(
schema,
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(self.staging_rows)),
)?;
let coalesced_batch = coalesce_batches_unchecked(schema, &self.staging_batches);
self.staging_batches.clear();
self.staging_rows = 0;
self.staging_batches_mem_size = 0;
Ok(coalesced_batch)
Expand Down Expand Up @@ -177,6 +159,30 @@ impl Stream for CoalesceStream {
}
}

/// coalesce batches without checking there schemas, invokers must make
/// sure all arrays have the same schema
pub fn coalesce_batches_unchecked(schema: SchemaRef, batches: &[RecordBatch]) -> RecordBatch {
let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
let num_fields = schema.fields().len();
let mut coalesced_cols = vec![];

for i in 0..num_fields {
let data_type = schema.field(i).data_type();
let mut cols = Vec::with_capacity(batches.len());
for j in 0..batches.len() {
cols.push(batches[j].column(i).clone());
}
coalesced_cols.push(coalesce_arrays_unchecked(data_type, &cols));
}

RecordBatch::try_new_with_options(
schema,
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.expect("error coalescing record batch")
}

/// coalesce arrays without checking there data types, invokers must make
/// sure all arrays have the same data type
pub fn coalesce_arrays_unchecked(data_type: &DataType, arrays: &[ArrayRef]) -> ArrayRef {
Expand Down
Loading

0 comments on commit a931b42

Please sign in to comment.