From ce474f9e1c81d1fec2b2cdfe5844bc758c54c99a Mon Sep 17 00:00:00 2001 From: zhangli20 Date: Thu, 21 Nov 2024 12:53:17 +0800 Subject: [PATCH] remove bigdecimal dependency and use arrow's builtin decimal cast. implement better logging with stage and partition ids. other minor code refactoring. --- Cargo.lock | 16 - native-engine/blaze/src/exec.rs | 33 +- native-engine/blaze/src/logging.rs | 11 +- native-engine/blaze/src/rt.rs | 150 ++++--- .../datafusion-ext-commons/Cargo.toml | 1 - .../datafusion-ext-commons/src/cast.rs | 385 ++++++++---------- .../datafusion-ext-commons/src/coalesce.rs | 5 + native-engine/datafusion-ext-exprs/Cargo.toml | 1 - .../datafusion-ext-functions/Cargo.toml | 1 - .../src/agg/agg_hash_map.rs | 22 +- .../datafusion-ext-plans/src/agg_exec.rs | 23 +- .../src/broadcast_join_build_hash_map_exec.rs | 5 +- .../src/common/execution_context.rs | 84 ++-- .../src/ipc_reader_exec.rs | 89 +--- .../src/joins/join_hash_map.rs | 21 +- .../src/parquet_sink_exec.rs | 24 +- .../datafusion-ext-plans/src/project_exec.rs | 29 +- .../src/shuffle/buffered_data.rs | 21 +- .../datafusion-ext-plans/src/shuffle/mod.rs | 70 ++-- .../datafusion-ext-plans/src/sort_exec.rs | 52 +-- 20 files changed, 452 insertions(+), 591 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a56b355b9..535f8ba00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -366,19 +366,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "bigdecimal" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" -dependencies = [ - "autocfg", - "libm", - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -907,7 +894,6 @@ dependencies = [ "arrow", "arrow-schema", "async-trait", - "bigdecimal", "bitvec", "blaze-jni-bridge", "byteorder", @@ -934,7 +920,6 @@ version = "0.1.0" dependencies = [ "arrow", "async-trait", - "bigdecimal", "blaze-jni-bridge", "datafusion", "datafusion-ext-commons", @@ -953,7 +938,6 @@ version = "0.1.0" dependencies = [ "arrow", "async-trait", - "bigdecimal", "blaze-jni-bridge", "datafusion", "datafusion-ext-commons", diff --git a/native-engine/blaze/src/exec.rs b/native-engine/blaze/src/exec.rs index 5c5acc11f..409de6d6d 100644 --- a/native-engine/blaze/src/exec.rs +++ b/native-engine/blaze/src/exec.rs @@ -19,7 +19,6 @@ use blaze_jni_bridge::{ jni_bridge::JavaClasses, *, }; -use blaze_serde::protobuf::TaskDefinition; use datafusion::{ common::Result, error::DataFusionError, @@ -27,17 +26,14 @@ use datafusion::{ disk_manager::DiskManagerConfig, runtime_env::{RuntimeConfig, RuntimeEnv}, }, - physical_plan::{displayable, ExecutionPlan}, prelude::{SessionConfig, SessionContext}, }; -use datafusion_ext_commons::df_execution_err; use datafusion_ext_plans::memmgr::MemManager; use jni::{ objects::{JClass, JObject}, JNIEnv, }; use once_cell::sync::OnceCell; -use prost::Message; use crate::{handle_unwinded_scope, logging::init_logging, rt::NativeExecutionRuntime}; @@ -81,38 +77,11 @@ pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative( })?; let native_wrapper = jni_new_global_ref!(native_wrapper)?; - // decode plan - let raw_task_definition = jni_call!( - BlazeCallNativeWrapper(native_wrapper.as_obj()) - .getRawTaskDefinition() -> JObject)?; - let task_definition = TaskDefinition::decode( - jni_convert_byte_array!(raw_task_definition.as_obj())?.as_slice(), - ) - .or_else(|err| df_execution_err!("cannot decode execution plan: {err:?}"))?; - - let task_id = &task_definition.task_id.expect("task_id is empty"); - let plan = &task_definition.plan.expect("plan is empty"); - drop(raw_task_definition); - - // get execution plan - let execution_plan: Arc = plan - .try_into() - .or_else(|err| df_execution_err!("cannot create execution plan: {err:?}"))?; - let execution_plan_displayable = displayable(execution_plan.as_ref()) - .indent(true) - .to_string(); - log::info!("Creating native execution plan succeeded"); - log::info!(" task_id={task_id:?}"); - log::info!(" execution plan:\n{execution_plan_displayable}"); - - // execute to stream + // create execution runtime let runtime = Box::new(NativeExecutionRuntime::start( native_wrapper, - execution_plan, - task_id.partition_id as usize, SESSION.get().unwrap().task_ctx(), )?); - log::info!("Blaze native thread created"); // returns runtime raw pointer Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64) diff --git a/native-engine/blaze/src/logging.rs b/native-engine/blaze/src/logging.rs index f30f83646..b4f4e6b62 100644 --- a/native-engine/blaze/src/logging.rs +++ b/native-engine/blaze/src/logging.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Instant; +use std::{cell::Cell, time::Instant}; use log::{Level, LevelFilter, Log, Metadata, Record}; use once_cell::sync::OnceCell; +thread_local! { + pub static THREAD_STAGE_ID: Cell = Cell::new(0); + pub static THREAD_PARTITION_ID: Cell = Cell::new(0); +} + const MAX_LEVEL: Level = Level::Info; pub fn init_logging() { @@ -43,8 +48,10 @@ impl Log for SimpleLogger { if self.enabled(record.metadata()) { let elapsed = Instant::now() - self.start_instant; let elapsed_sec = elapsed.as_secs_f64(); + let stage_id = THREAD_STAGE_ID.get(); + let partition_id = THREAD_PARTITION_ID.get(); eprintln!( - "(+{elapsed_sec:.3}s) [{}] Blaze - {}", + "(+{elapsed_sec:.3}s) [{}] (stage: {stage_id}, partition: {partition_id}) - {}", record.level(), record.args() ); diff --git a/native-engine/blaze/src/rt.rs b/native-engine/blaze/src/rt.rs index bf8811bd4..7a09765cf 100644 --- a/native-engine/blaze/src/rt.rs +++ b/native-engine/blaze/src/rt.rs @@ -24,50 +24,77 @@ use arrow::{ record_batch::RecordBatch, }; use blaze_jni_bridge::{ - is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_exception_check, - jni_exception_occurred, jni_new_global_ref, jni_new_object, jni_new_string, + is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_convert_byte_array, + jni_exception_check, jni_exception_occurred, jni_new_global_ref, jni_new_object, + jni_new_string, }; +use blaze_serde::protobuf::TaskDefinition; use datafusion::{ common::Result, error::DataFusionError, execution::context::TaskContext, - physical_plan::{metrics::ExecutionPlanMetricsSet, ExecutionPlan}, + physical_plan::{ + displayable, empty::EmptyExec, metrics::ExecutionPlanMetricsSet, ExecutionPlan, + }, }; -use datafusion_ext_commons::df_execution_err; +use datafusion_ext_commons::{df_execution_err, downcast_any}; use datafusion_ext_plans::{ - common::execution_context::ExecutionContext, parquet_sink_exec::ParquetSinkExec, + common::execution_context::{cancel_all_tasks, ExecutionContext}, + ipc_writer_exec::IpcWriterExec, + parquet_sink_exec::ParquetSinkExec, + shuffle_writer_exec::ShuffleWriterExec, }; use futures::{FutureExt, StreamExt}; use jni::objects::{GlobalRef, JObject}; +use prost::Message; use tokio::runtime::Runtime; -use crate::{handle_unwinded_scope, metrics::update_spark_metric_node}; +use crate::{ + handle_unwinded_scope, + logging::{THREAD_PARTITION_ID, THREAD_STAGE_ID}, + metrics::update_spark_metric_node, +}; pub struct NativeExecutionRuntime { exec_ctx: Arc, native_wrapper: GlobalRef, plan: Arc, batch_receiver: Receiver>>, - rt: Runtime, + tokio_runtime: Runtime, } impl NativeExecutionRuntime { - pub fn start( - native_wrapper: GlobalRef, - plan: Arc, - partition: usize, - context: Arc, - ) -> Result { + pub fn start(native_wrapper: GlobalRef, context: Arc) -> Result { + // decode plan + let native_wrapper_cloned = native_wrapper.clone(); + let raw_task_definition = jni_call!( + BlazeCallNativeWrapper(native_wrapper_cloned.as_obj()).getRawTaskDefinition() -> JObject + )?; + let raw_task_definition = jni_convert_byte_array!(raw_task_definition.as_obj())?; + let task_definition = TaskDefinition::decode(raw_task_definition.as_slice()) + .or_else(|err| df_execution_err!("cannot decode execution plan: {err:?}"))?; + + let task_id = &task_definition.task_id.expect("task_id is empty"); + let stage_id = task_id.stage_id as usize; + let partition_id = task_id.partition_id as usize; + let plan = &task_definition.plan.expect("plan is empty"); + drop(raw_task_definition); + + // get execution plan + let execution_plan: Arc = plan + .try_into() + .or_else(|err| df_execution_err!("cannot create execution plan: {err:?}"))?; + let exec_ctx = ExecutionContext::new( context.clone(), - partition, - plan.schema(), + partition_id, + execution_plan.schema(), &ExecutionPlanMetricsSet::new(), ); // init ffi schema let ffi_schema = FFI_ArrowSchema::try_from(exec_ctx.output_schema().as_ref())?; - jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj()) + jni_call!(BlazeCallNativeWrapper(native_wrapper_cloned.as_obj()) .importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> () )?; @@ -75,7 +102,8 @@ impl NativeExecutionRuntime { // propagate classloader and task context to spawned children threads let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?; let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?; - let rt = tokio::runtime::Builder::new_multi_thread() + let tokio_runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}")) .on_thread_start(move || { let classloader = JavaClasses::get().classloader; let _ = jni_call_static!( @@ -84,29 +112,41 @@ impl NativeExecutionRuntime { let _ = jni_call_static!( JniBridge.setTaskContext(spark_task_context_global.as_obj()) -> () ); + THREAD_STAGE_ID.set(stage_id); + THREAD_PARTITION_ID.set(partition_id); }) .build()?; - let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1); - let nrt = Self { - exec_ctx: exec_ctx.clone(), - native_wrapper: native_wrapper.clone(), - plan: plan.clone(), - rt, - batch_receiver, - }; + // execute plan to stream + let exec_ctx_cloned = exec_ctx.clone(); + let execution_plan_cloned = execution_plan.clone(); + let stream_future = tokio_runtime.spawn_blocking(move || { + let displayable = displayable(execution_plan_cloned.as_ref()) + .set_show_schema(true) + .indent(true) + .to_string(); + log::info!("start executing plan:\n{displayable}"); + + // execute plan to output stream + let mut stream = exec_ctx_cloned.execute(&execution_plan_cloned)?; + + // coalesce output stream if necessary + if downcast_any!(execution_plan_cloned, EmptyExec).is_err() + && downcast_any!(execution_plan_cloned, ParquetSinkExec).is_err() + && downcast_any!(execution_plan_cloned, IpcWriterExec).is_err() + && downcast_any!(execution_plan_cloned, ShuffleWriterExec).is_err() + { + stream = exec_ctx_cloned.coalesce_with_default_batch_size(stream); + } + Ok::<_, DataFusionError>(stream) + }); // spawn batch producer + let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1); let err_sender = batch_sender.clone(); let consume_stream = async move { - // execute and coalesce plan to output stream - let stream = exec_ctx.execute(&plan)?; - let mut stream = if plan.as_any().downcast_ref::().is_some() { - stream // cannot coalesce parquet sink output - } else { - exec_ctx.coalesce_with_default_batch_size(stream) - }; - + // produce batches + let mut stream = stream_future.await.expect("tokio error")?; while let Some(batch) = AssertUnwindSafe(stream.next()) .catch_unwind() .await @@ -125,46 +165,51 @@ impl NativeExecutionRuntime { batch_sender .send(Ok(None)) .or_else(|err| df_execution_err!("send batch error: {err}"))?; - log::info!("[partition={partition}] finished"); + log::info!("task finished"); Ok::<_, DataFusionError>(()) }; - nrt.rt.spawn(async move { + + let native_wrapper_cloned = native_wrapper.clone(); + tokio_runtime.spawn(async move { consume_stream.await.unwrap_or_else(|err| { handle_unwinded_scope(|| { let task_running = is_task_running(); if !task_running { - log::warn!( - "[partition={partition}] task completed before native execution done" - ); + log::warn!("task completed before native execution done"); return Ok(()); } let cause = if jni_exception_check!()? { - let err_text = format!( - "[partition={partition}] native execution panics with exception: {err}" - ); + let err_text = format!("native execution panics with exception: {err}"); err_sender.send(df_execution_err!("{err_text}"))?; log::error!("{err_text}"); Some(jni_exception_occurred!()?) } else { - let err_text = - format!("[partition={partition}] native execution panics: {err}"); + let err_text = format!("native execution panics: {err}"); err_sender.send(df_execution_err!("{err_text}"))?; log::error!("{err_text}"); None }; set_error( - &native_wrapper, - &format!("[partition={partition}] panics: {err}"), + &native_wrapper_cloned, + &format!("task panics: {err}"), cause.map(|e| e.as_obj()), )?; - log::info!("[partition={partition}] exited abnormally."); + log::info!("task exited abnormally."); Ok::<_, Box>(()) }) }); }); - Ok(nrt) + + let native_execution_runtime = Self { + exec_ctx: exec_ctx.clone(), + native_wrapper: native_wrapper.clone(), + plan: execution_plan.clone(), + tokio_runtime, + batch_receiver, + }; + Ok(native_execution_runtime) } pub fn next_batch(&self) -> bool { @@ -186,13 +231,12 @@ impl NativeExecutionRuntime { } }; - let partition = self.exec_ctx.partition_id(); match next_batch() { Ok(ret) => return ret, Err(err) => { let _ = set_error( &self.native_wrapper, - &format!("[partition={partition}] poll record batch error: {err}"), + &format!("poll record batch error: {err}"), None, ); return false; @@ -203,13 +247,13 @@ impl NativeExecutionRuntime { pub fn finalize(self) { let partition = self.exec_ctx.partition_id(); - log::info!("[partition={partition}] native execution finalizing"); + log::info!("(partition={partition}) native execution finalizing"); self.update_metrics().unwrap_or_default(); drop(self.plan); - self.exec_ctx.cancel_task(); // cancel all pending streams - self.rt.shutdown_background(); - log::info!("[partition={partition}] native execution finalized"); + cancel_all_tasks(&self.exec_ctx.task_ctx()); // cancel all pending streams + self.tokio_runtime.shutdown_background(); + log::info!("(partition={partition}) native execution finalized"); } fn update_metrics(&self) -> Result<()> { diff --git a/native-engine/datafusion-ext-commons/Cargo.toml b/native-engine/datafusion-ext-commons/Cargo.toml index 2724ed70a..e63c392ad 100644 --- a/native-engine/datafusion-ext-commons/Cargo.toml +++ b/native-engine/datafusion-ext-commons/Cargo.toml @@ -13,7 +13,6 @@ arrow-schema = { workspace = true } async-trait = "0.1.83" bitvec = "1.0.1" blaze-jni-bridge = { workspace = true } -bigdecimal = "0.4.6" byteorder = "1.5.0" bytes = "1.8.0" datafusion = { workspace = true } diff --git a/native-engine/datafusion-ext-commons/src/cast.rs b/native-engine/datafusion-ext-commons/src/cast.rs index 2a32d78ab..4ec747bb6 100644 --- a/native-engine/datafusion-ext-commons/src/cast.rs +++ b/native-engine/datafusion-ext-commons/src/cast.rs @@ -12,16 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{str::FromStr, sync::Arc}; +use std::sync::Arc; use arrow::{array::*, datatypes::*}; -use bigdecimal::{FromPrimitive, ToPrimitive}; -use datafusion::common::{ - cast::{as_float32_array, as_float64_array}, - Result, -}; -use num::{cast::AsPrimitive, Bounded, Integer, Signed}; -use paste::paste; +use datafusion::common::Result; use crate::df_execution_err; @@ -44,46 +38,48 @@ pub fn cast_impl( (_, &DataType::Null) => Arc::new(NullArray::new(array.len())), // float to int - (&DataType::Float32, &DataType::Int8) => Arc::new(cast_float_to_integer::<_, Int8Type>( - as_float32_array(array)?, - )), - (&DataType::Float32, &DataType::Int16) => Arc::new(cast_float_to_integer::<_, Int16Type>( - as_float32_array(array)?, - )), - (&DataType::Float32, &DataType::Int32) => Arc::new(cast_float_to_integer::<_, Int32Type>( - as_float32_array(array)?, - )), - (&DataType::Float32, &DataType::Int64) => Arc::new(cast_float_to_integer::<_, Int64Type>( - as_float32_array(array)?, - )), - (&DataType::Float64, &DataType::Int8) => Arc::new(cast_float_to_integer::<_, Int8Type>( - as_float64_array(array)?, - )), - (&DataType::Float64, &DataType::Int16) => Arc::new(cast_float_to_integer::<_, Int16Type>( - as_float64_array(array)?, - )), - (&DataType::Float64, &DataType::Int32) => Arc::new(cast_float_to_integer::<_, Int32Type>( - as_float64_array(array)?, - )), - (&DataType::Float64, &DataType::Int64) => Arc::new(cast_float_to_integer::<_, Int64Type>( - as_float64_array(array)?, - )), - - (&DataType::Utf8, &DataType::Int8) - | (&DataType::Utf8, &DataType::Int16) - | (&DataType::Utf8, &DataType::Int32) - | (&DataType::Utf8, &DataType::Int64) => { - // spark compatible string to integer cast - try_cast_string_array_to_integer(array, cast_type)? + // use unchecked casting, which is compatible with spark + (&DataType::Float32, &DataType::Int8) => { + let input = array.as_primitive::(); + let output: Int8Array = arrow::compute::unary(input, |v| v as i8); + Arc::new(output) + } + (&DataType::Float32, &DataType::Int16) => { + let input = array.as_primitive::(); + let output: Int16Array = arrow::compute::unary(input, |v| v as i16); + Arc::new(output) + } + (&DataType::Float32, &DataType::Int32) => { + let input = array.as_primitive::(); + let output: Int32Array = arrow::compute::unary(input, |v| v as i32); + Arc::new(output) + } + (&DataType::Float32, &DataType::Int64) => { + let input = array.as_primitive::(); + let output: Int64Array = arrow::compute::unary(input, |v| v as i64); + Arc::new(output) } - (&DataType::Utf8, &DataType::Decimal128(..)) => { - // spark compatible string to decimal cast - try_cast_string_array_to_decimal(array, cast_type)? + (&DataType::Float64, &DataType::Int8) => { + let input = array.as_primitive::(); + let output: Int8Array = arrow::compute::unary(input, |v| v as i8); + Arc::new(output) } - (&DataType::Decimal128(..), DataType::Utf8) => { - // spark compatible decimal to string cast - try_cast_decimal_array_to_string(array, cast_type)? + (&DataType::Float64, &DataType::Int16) => { + let input = array.as_primitive::(); + let output: Int16Array = arrow::compute::unary(input, |v| v as i16); + Arc::new(output) } + (&DataType::Float64, &DataType::Int32) => { + let input = array.as_primitive::(); + let output: Int32Array = arrow::compute::unary(input, |v| v as i32); + Arc::new(output) + } + (&DataType::Float64, &DataType::Int64) => { + let input = array.as_primitive::(); + let output: Int64Array = arrow::compute::unary(input, |v| v as i64); + Arc::new(output) + } + (&DataType::Timestamp(..), DataType::Float64) => { // timestamp to f64 = timestamp to i64 to f64, only used in agg.sum() arrow::compute::cast( @@ -93,7 +89,13 @@ pub fn cast_impl( } (&DataType::Boolean, DataType::Utf8) => { // spark compatible boolean to string cast - try_cast_boolean_array_to_string(array, cast_type)? + Arc::new( + array + .as_boolean() + .iter() + .map(|value| value.map(|value| if value { "true" } else { "false" })) + .collect::(), + ) } (&DataType::List(_), DataType::List(to_field)) => { let list = as_list_array(array); @@ -199,188 +201,23 @@ pub fn cast_impl( }) } -fn try_cast_string_array_to_integer(array: &dyn Array, cast_type: &DataType) -> Result { - macro_rules! cast { - ($target_type:ident) => {{ - type B = paste! {[<$target_type Builder>]}; - let array = array.as_any().downcast_ref::().unwrap(); - let mut builder = B::new(); - - for v in array.iter() { - match v { - Some(s) => builder.append_option(to_integer(s)), - None => builder.append_null(), - } - } - Arc::new(builder.finish()) - }}; - } - - Ok(match cast_type { - DataType::Int8 => cast!(Int8), - DataType::Int16 => cast!(Int16), - DataType::Int32 => cast!(Int32), - DataType::Int64 => cast!(Int64), - _ => arrow::compute::cast(array, cast_type)?, - }) -} - -fn try_cast_string_array_to_decimal(array: &dyn Array, cast_type: &DataType) -> Result { - if let &DataType::Decimal128(precision, scale) = cast_type { - let array = array.as_any().downcast_ref::().unwrap(); - let mut builder = Decimal128Builder::new(); - - for v in array.iter() { - match v { - Some(s) => match to_decimal(s, precision, scale) { - Some(v) => builder.append_value(v), - None => builder.append_null(), - }, - None => builder.append_null(), - } - } - return Ok(Arc::new( - builder - .finish() - .with_precision_and_scale(precision, scale)?, - )); - } - unreachable!("cast_type must be DataType::Decimal") -} - -fn try_cast_decimal_array_to_string(array: &dyn Array, cast_type: &DataType) -> Result { - if let &DataType::Utf8 = cast_type { - let array = array.as_any().downcast_ref::().unwrap(); - let mut builder = StringBuilder::new(); - for v in 0..array.len() { - if array.is_valid(v) { - builder.append_value(array.value_as_string(v)) - } else { - builder.append_null() - } - } - return Ok(Arc::new(builder.finish())); - } - unreachable!("cast_type must be DataType::Utf8") -} - -fn try_cast_boolean_array_to_string(array: &dyn Array, cast_type: &DataType) -> Result { - if let &DataType::Utf8 = cast_type { - let array = array.as_any().downcast_ref::().unwrap(); - return Ok(Arc::new( - array - .iter() - .map(|value| value.map(|value| if value { "true" } else { "false" })) - .collect::(), - )); - } - unreachable!("cast_type must be DataType::Utf8") -} - -fn cast_float_to_integer( - array: &PrimitiveArray, -) -> PrimitiveArray -where - F::Native: AsPrimitive, -{ - arrow::compute::unary(array, |v| v.as_()) -} - -// this implementation is original copied from spark UTF8String.scala -fn to_integer(input: &str) -> Option { - let bytes = input.as_bytes(); - - if bytes.is_empty() { - return None; - } - - let b = bytes[0]; - let negative = b == b'-'; - let mut offset = 0; - - if negative || b == b'+' { - offset += 1; - if bytes.len() == 1 { - return None; - } - } - - let separator = b'.'; - let radix = T::from_usize(10).unwrap(); - let stop_value = T::min_value() / radix; - let mut result = T::zero(); - - while offset < bytes.len() { - let b = bytes[offset]; - offset += 1; - if b == separator { - // We allow decimals and will return a truncated integral in that case. - // Therefore we won't throw an exception here (checking the fractional - // part happens below.) - break; - } - - let digit = if b.is_ascii_digit() { - b - b'0' - } else { - return None; - }; - - // We are going to process the new digit and accumulate the result. However, - // before doing this, if the result is already smaller than the - // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely - // be smaller than minValue, and we can stop. - if result < stop_value { - return None; - } - - result = result * radix - T::from_u8(digit).unwrap(); - // Since the previous result is less than or equal to stopValue(Long.MIN_VALUE / - // radix), we can just use `result > 0` to check overflow. If result - // overflows, we should stop. - if result > T::zero() { - return None; - } - } - - // This is the case when we've encountered a decimal separator. The fractional - // part will not change the number, but we will verify that the fractional part - // is well formed. - while offset < bytes.len() { - let current_byte = bytes[offset]; - if !current_byte.is_ascii_digit() { - return None; - } - offset += 1; - } - - if !negative { - result = -result; - if result < T::zero() { - return None; - } - } - Some(result) -} - -fn to_decimal(input: &str, precision: u8, scale: i8) -> Option { - let precision = precision as u64; - let scale = scale as i64; - bigdecimal::BigDecimal::from_str(input) - .ok() - .map(|decimal| decimal.with_prec(precision).with_scale(scale)) - .and_then(|decimal| { - let (bigint, _exp) = decimal.as_bigint_and_exponent(); - bigint.to_i128() - }) -} - #[cfg(test)] mod test { - use datafusion::common::cast::as_int32_array; + use datafusion::common::cast::{as_decimal128_array, as_float64_array, as_int32_array}; use crate::cast::*; + #[test] + fn test_boolean_to_string() { + let bool_array: ArrayRef = + Arc::new(BooleanArray::from_iter(vec![None, Some(true), Some(false)])); + let casted = cast(&bool_array, &DataType::Utf8).unwrap(); + assert_eq!( + as_string_array(&casted), + &StringArray::from_iter(vec![None, Some("true"), Some("false")]) + ); + } + #[test] fn test_float_to_int() { let f64_array: ArrayRef = Arc::new(Float64Array::from_iter(vec![ @@ -394,10 +231,8 @@ mod test { Some(f64::NAN), ])); let casted = cast(&f64_array, &DataType::Int32).unwrap(); - let i32_array = as_int32_array(&casted).unwrap(); - assert_eq!( - i32_array, + as_int32_array(&casted).unwrap(), &Int32Array::from_iter(vec![ None, Some(123), @@ -410,4 +245,102 @@ mod test { ]) ); } + + #[test] + fn test_int_to_float() { + let i32_array: ArrayRef = Arc::new(Int32Array::from_iter(vec![ + None, + Some(123), + Some(987), + Some(i32::MAX), + Some(i32::MIN), + ])); + let casted = cast(&i32_array, &DataType::Float64).unwrap(); + assert_eq!( + as_float64_array(&casted).unwrap(), + &Float64Array::from_iter(vec![ + None, + Some(123.0), + Some(987.0), + Some(i32::MAX as f64), + Some(i32::MIN as f64), + ]) + ); + } + + #[test] + fn test_int_to_decimal() { + let i32_array: ArrayRef = Arc::new(Int32Array::from_iter(vec![ + None, + Some(123), + Some(987), + Some(i32::MAX), + Some(i32::MIN), + ])); + let casted = cast(&i32_array, &DataType::Decimal128(38, 18)).unwrap(); + assert_eq!( + as_decimal128_array(&casted).unwrap(), + &Decimal128Array::from_iter(vec![ + None, + Some(123000000000000000000), + Some(987000000000000000000), + Some(i32::MAX as i128 * 1000000000000000000), + Some(i32::MIN as i128 * 1000000000000000000), + ]) + .with_precision_and_scale(38, 18) + .unwrap() + ); + } + + #[test] + fn test_string_to_decimal() { + let string_array: ArrayRef = Arc::new(StringArray::from_iter(vec![ + None, + Some("123.456"), + Some("987.654"), + Some("123456789012345.678901234567890"), + Some("-123456789012345.678901234567890"), + ])); + let casted = cast(&string_array, &DataType::Decimal128(38, 18)).unwrap(); + assert_eq!( + as_decimal128_array(&casted).unwrap(), + &Decimal128Array::from_iter(vec![ + None, + Some(123456000000000000000i128), + Some(987654000000000000000i128), + Some(123456789012345678901234567890000i128), + Some(-123456789012345678901234567890000i128), + ]) + .with_precision_and_scale(38, 18) + .unwrap() + ); + } + + #[test] + fn test_decimal_to_string() { + let decimal_array: ArrayRef = Arc::new( + Decimal128Array::from_iter(vec![ + None, + Some(123000000000000000000), + Some(987000000000000000000), + Some(987654321000000000000), + Some(i32::MAX as i128 * 1000000000000000000), + Some(i32::MIN as i128 * 1000000000000000000), + ]) + .with_precision_and_scale(38, 18) + .unwrap(), + ); + let casted = cast(&decimal_array, &DataType::Utf8).unwrap(); + assert_eq!( + casted.as_any().downcast_ref::().unwrap(), + &StringArray::from_iter(vec![ + None, + Some("123.000000000000000000"), + Some("987.000000000000000000"), + Some("987.654321000000000000"), + Some("2147483647.000000000000000000"), + Some("-2147483648.000000000000000000"), + ]) + ); + } } diff --git a/native-engine/datafusion-ext-commons/src/coalesce.rs b/native-engine/datafusion-ext-commons/src/coalesce.rs index b27086c54..257c46f00 100644 --- a/native-engine/datafusion-ext-commons/src/coalesce.rs +++ b/native-engine/datafusion-ext-commons/src/coalesce.rs @@ -26,6 +26,11 @@ use arrow_schema::{DataType, SchemaRef}; /// coalesce batches without checking there schemas, invokers must make /// sure all arrays have the same schema pub fn coalesce_batches_unchecked(schema: SchemaRef, batches: &[RecordBatch]) -> RecordBatch { + match batches.len() { + 0 => return RecordBatch::new_empty(schema), + 1 => return batches[0].clone(), + _ => {} + } let num_rows = batches.iter().map(|b| b.num_rows()).sum::(); let num_fields = schema.fields().len(); let mut coalesced_cols = vec![]; diff --git a/native-engine/datafusion-ext-exprs/Cargo.toml b/native-engine/datafusion-ext-exprs/Cargo.toml index 470daf6d9..83556c0ee 100644 --- a/native-engine/datafusion-ext-exprs/Cargo.toml +++ b/native-engine/datafusion-ext-exprs/Cargo.toml @@ -8,7 +8,6 @@ resolver = "1" arrow = { workspace = true } async-trait = "0.1.83" blaze-jni-bridge = { workspace = true } -bigdecimal = "0.4.6" datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } itertools = "0.13.0" diff --git a/native-engine/datafusion-ext-functions/Cargo.toml b/native-engine/datafusion-ext-functions/Cargo.toml index 3c049730a..c2786d8c5 100644 --- a/native-engine/datafusion-ext-functions/Cargo.toml +++ b/native-engine/datafusion-ext-functions/Cargo.toml @@ -8,7 +8,6 @@ resolver = "1" arrow = { workspace = true } async-trait = "0.1.83" blaze-jni-bridge = { workspace = true } -bigdecimal = "0.4.6" datafusion = { workspace = true } datafusion-ext-commons = { workspace = true } itertools = "0.13.0" diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs index 148bdb2be..6e526a0a8 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs @@ -75,19 +75,29 @@ impl Table { fn upsert_many(&mut self, keys: Vec) -> Vec { let mut hashes = unchecked!(keys.iter().map(agg_hash).collect::>()); + const PREFETCH_AHEAD: usize = 4; macro_rules! entries { ($i:expr) => {{ - (hashes[$i] % (1 << self.map_mod_bits)) as usize + (hashes[$i] % (1 << self.map_mod_bits)) }}; } + macro_rules! prefetch_at { + ($i:expr) => {{ + if $i < hashes.len() { + prefetch_write_data!(&self.map[entries!($i) as usize]); + } + }}; + } + + for i in 0..PREFETCH_AHEAD { + prefetch_at!(i); + } + for (i, key) in keys.into_iter().enumerate() { - const PREFETCH_AHEAD: usize = 4; - if i + PREFETCH_AHEAD < hashes.len() { - prefetch_write_data!(&self.map[entries!(i + PREFETCH_AHEAD)]); - } - hashes[i] = self.upsert_one_impl(key, hashes[i], entries!(i)); + prefetch_at!(i + PREFETCH_AHEAD); + hashes[i] = self.upsert_one_impl(key, hashes[i], entries!(i) as usize); } // safety: transmute to Vec diff --git a/native-engine/datafusion-ext-plans/src/agg_exec.rs b/native-engine/datafusion-ext-plans/src/agg_exec.rs index 664310188..6c11f78ef 100644 --- a/native-engine/datafusion-ext-plans/src/agg_exec.rs +++ b/native-engine/datafusion-ext-plans/src/agg_exec.rs @@ -174,23 +174,18 @@ fn execute_agg_with_grouping_hash( exec_ctx: Arc, agg_ctx: Arc, ) -> Result { - // create tables - let tables = Arc::new(AggTable::new(agg_ctx.clone(), exec_ctx.clone())); - MemManager::register_consumer(tables.clone(), true); - - // start processing input batches - let input = exec_ctx.execute_with_input_stats(&input)?; - let mut coalesced = exec_ctx.coalesce_with_default_batch_size(input); - Ok(exec_ctx .clone() .output_with_sender("Agg", |sender| async move { let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); sender.exclude_time(&elapsed_compute); + // create tables + let tables = Arc::new(AggTable::new(agg_ctx.clone(), exec_ctx.clone())); + MemManager::register_consumer(tables.clone(), true); + log::info!( - "[partition={}] start hash aggregating, supports_partial_skipping={}, num_groupings={}, num_partial={}, num_partial_merge={}, num_final={}", - exec_ctx.partition_id(), + "start hash aggregating, supports_partial_skipping={}, num_groupings={}, num_partial={}, num_partial_merge={}, num_final={}", agg_ctx.supports_partial_skipping, agg_ctx.groupings.len(), agg_ctx.aggs.iter().filter(|agg| agg.mode.is_partial()).count(), @@ -200,8 +195,9 @@ fn execute_agg_with_grouping_hash( let _timer = elapsed_compute.timer(); let mut partial_skipping_triggered = false; + let mut input = exec_ctx.execute_with_input_stats(&input)?; while let Some(batch) = elapsed_compute - .exclude_timer_async(coalesced.next()) + .exclude_timer_async(input.next()) .await .transpose()? { @@ -277,10 +273,7 @@ fn execute_agg_no_grouping( )?; exec_ctx.baseline_metrics().record_output(1); sender.send(batch).await; - log::info!( - "[partition={}] aggregate exec (no grouping) outputting one record", - exec_ctx.partition_id(), - ); + log::info!("aggregate exec (no grouping) outputting one record"); Ok(()) })) } diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs index 433a50ac8..79b34e99a 100644 --- a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs +++ b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs @@ -18,7 +18,7 @@ use std::{ sync::Arc, }; -use arrow::{array::RecordBatch, compute::concat_batches, datatypes::SchemaRef}; +use arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::{ common::Result, execution::{SendableRecordBatchStream, TaskContext}, @@ -29,6 +29,7 @@ use datafusion::{ PlanProperties, }, }; +use datafusion_ext_commons::coalesce::coalesce_batches_unchecked; use futures::StreamExt; use once_cell::sync::OnceCell; @@ -123,7 +124,7 @@ pub fn collect_hash_map( data_batches: Vec, keys: Vec>, ) -> Result { - let data_batch = concat_batches(&data_schema, data_batches.iter())?; + let data_batch = coalesce_batches_unchecked(data_schema, &data_batches); let hash_map = JoinHashMap::create_from_data_batch(data_batch, &keys)?; Ok(hash_map) } diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs b/native-engine/datafusion-ext-plans/src/common/execution_context.rs index e4dff33d1..a956ff778 100644 --- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs +++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs @@ -35,7 +35,7 @@ use datafusion_ext_commons::{ suggested_output_batch_mem_size, }; use futures::StreamExt; -use futures_util::{FutureExt, TryStreamExt}; +use futures_util::FutureExt; use once_cell::sync::OnceCell; use parking_lot::Mutex; use tokio::sync::mpsc::Sender; @@ -125,8 +125,6 @@ impl ExecutionContext { if batch.num_rows() == 0 { continue; } - let elapsed_compute = baseline_metrics.elapsed_compute().clone(); - let _timer = elapsed_compute.timer(); staging_rows += batch.num_rows(); staging_batches_mem_size += batch.get_array_mem_size(); @@ -137,40 +135,29 @@ impl ExecutionContext { } else { (batch_size_limit / 2, mem_size_limit / 2) }; - - let should_flush = - staging_rows >= batch_size_limit || staging_batches_mem_size >= mem_size_limit; - if should_flush { - let coalesced = coalesce_batches_unchecked( - schema.clone(), - &std::mem::take(&mut staging_batches), - ); + if staging_rows >= batch_size_limit || staging_batches_mem_size >= mem_size_limit { + // only count time of coalescing + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); + let coalesced = elapsed_compute.with_timer(|| { + coalesce_batches_unchecked(schema.clone(), &staging_batches) + }); staging_rows = 0; staging_batches_mem_size = 0; + staging_batches.clear(); sender.send(coalesced).await; } } if staging_rows > 0 { - let coalesced = coalesce_batches_unchecked( - schema.clone(), - &std::mem::take(&mut staging_batches), - ); + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); + let coalesced = elapsed_compute + .with_timer(|| coalesce_batches_unchecked(schema.clone(), &staging_batches)); + staging_batches.clear(); sender.send(coalesced).await; } Ok(()) }) } - pub fn build_output_stream( - self: &Arc, - fut: impl Future> + Send + 'static, - ) -> SendableRecordBatchStream { - Box::pin(RecordBatchStreamAdapter::new( - self.output_schema(), - futures_util::stream::once(fut).try_flatten(), - )) - } - pub fn execute_with_input_stats( self: &Arc, input: &Arc, @@ -255,7 +242,6 @@ impl ExecutionContext { }); if let Err(err) = result { - let err_message = err.to_string(); err_sender .send(df_execution_err!("{err}")) .await @@ -263,20 +249,14 @@ impl ExecutionContext { // panic current spawn let task_running = is_task_running(); - if !task_running { - panic!("output_with_sender[{desc}] canceled due to task finished/killed"); - } else { - panic!("output_with_sender[{desc}] error: {err_message}"); + if task_running { + panic!("output_with_sender[{desc}] error: {}", err.to_string()); } } Ok(()) }); stream_builder.build() } - - pub fn cancel_task(self: &Arc) { - WrappedRecordBatchSender::cancel_task(self); - } } #[derive(Clone)] @@ -345,24 +325,6 @@ impl WrappedRecordBatchSender { self.exclude_time.get_or_init(|| exclude_time.clone()); } - pub fn cancel_task(exec_ctx: &Arc) { - let mut working_senders = working_senders().lock(); - *working_senders = std::mem::take(&mut *working_senders) - .into_iter() - .filter(|wrapped| match wrapped.upgrade() { - Some(wrapped) if Arc::ptr_eq(&wrapped.exec_ctx, exec_ctx) => { - wrapped - .sender - .try_send(df_execution_err!("task completed/cancelled")) - .unwrap_or_default(); - false - } - Some(_) => true, // do not modify senders from other tasks - None => false, // already released - }) - .collect(); - } - pub async fn send(&self, batch: RecordBatch) { let exclude_time = self.exclude_time.get().cloned(); let send_time = exclude_time.as_ref().map(|_| Instant::now()); @@ -379,3 +341,21 @@ impl WrappedRecordBatchSender { }); } } + +pub fn cancel_all_tasks(task_ctx: &Arc) { + let mut working_senders = working_senders().lock(); + *working_senders = std::mem::take(&mut *working_senders) + .into_iter() + .filter(|wrapped| match wrapped.upgrade() { + Some(wrapped) if Arc::ptr_eq(&wrapped.exec_ctx.task_ctx, task_ctx) => { + wrapped + .sender + .try_send(df_execution_err!("task completed/cancelled")) + .unwrap_or_default(); + false + } + Some(_) => true, // do not modify senders from other tasks + None => false, // already released + }) + .collect(); +} diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index 5d3e05428..caccb48e2 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -17,14 +17,11 @@ use std::{ fmt::{Debug, Formatter}, fs::File, io::{BufReader, Cursor, Read, Seek, SeekFrom}, - sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, - Arc, - }, + sync::Arc, }; use arrow::{ - array::{Array, ArrayRef, RecordBatch, RecordBatchOptions}, + array::{RecordBatch, RecordBatchOptions}, datatypes::SchemaRef, }; use async_trait::async_trait; @@ -43,13 +40,9 @@ use datafusion::{ PlanProperties, SendableRecordBatchStream, Statistics, }, }; -use datafusion_ext_commons::{ - array_size::ArraySize, batch_size, coalesce::coalesce_arrays_unchecked, df_execution_err, - suggested_output_batch_mem_size, -}; +use datafusion_ext_commons::{array_size::ArraySize, df_execution_err}; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; -use parking_lot::Mutex; use crate::common::{execution_context::ExecutionContext, ipc_compression::IpcCompressionReader}; @@ -160,18 +153,14 @@ fn read_ipc( exec_ctx: Arc, ) -> Result { let size_counter = exec_ctx.register_counter_metric("size"); - let partition_id = exec_ctx.partition_id(); - - Ok(exec_ctx.clone().output_with_sender("IpcReader", move |sender| async move { - sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute()); - log::info!("[partition={partition_id}] start ipc reading"); + let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); + let exec_ctx_cloned = exec_ctx.clone(); - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); - let batch_size = batch_size(); - let staging_cols: Arc>>> = Arc::new(Mutex::new(vec![])); - let staging_num_rows = AtomicUsize::new(0); - let staging_mem_size = AtomicUsize::new(0); + let output = exec_ctx.clone().output_with_sender("IpcReader", move |sender| async move { + sender.exclude_time(&elapsed_compute); + log::info!("start ipc reading"); + let _timer = elapsed_compute.timer(); loop { // get next block let blocks = blocks.clone(); @@ -200,59 +189,19 @@ fn read_ipc( }); while let Some((num_rows, cols)) = reader.as_mut().read_batch(&schema)? { - let (cur_staging_num_rows, cur_staging_mem_size) = { - let staging_cols_cloned = staging_cols.clone(); - let mut staging_cols = staging_cols_cloned.lock(); - let mut cols_mem_size = 0; - staging_cols.resize_with(cols.len(), || vec![]); - for (col_idx, col) in cols.into_iter().enumerate() { - cols_mem_size += col.get_array_mem_size(); - staging_cols[col_idx].push(col); - } - drop(staging_cols); - staging_num_rows.fetch_add(num_rows, SeqCst); - staging_mem_size.fetch_add(cols_mem_size, SeqCst); - (staging_num_rows.load(SeqCst), staging_mem_size.load(SeqCst)) - }; - - if cur_staging_num_rows >= batch_size - || cur_staging_mem_size >= suggested_output_batch_mem_size() - { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - schema.clone(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)) - )?; - staging_num_rows.store(0, SeqCst); - staging_mem_size.store(0, SeqCst); - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - sender.send(batch).await; - } + let batch = RecordBatch::try_new_with_options( + schema.clone(), + cols, + &RecordBatchOptions::new().with_row_count(Some(num_rows)) + )?; + size_counter.add(batch.get_array_mem_size()); + exec_ctx_cloned.baseline_metrics().record_output(batch.num_rows()); + sender.send(batch).await; } } - - let cur_staging_num_rows = staging_num_rows.load(SeqCst); - if cur_staging_num_rows > 0 { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - schema.clone(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)) - )?; - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - sender.send(batch).await; - } Ok(()) - })) + }); + Ok(exec_ctx.coalesce_with_default_batch_size(output)) } fn get_channel_reader(block: JObject) -> Result>> { diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs index cf04b260a..a39bdc0c8 100644 --- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs @@ -246,24 +246,27 @@ impl Table { } pub fn lookup_many(&self, hashes: Vec) -> Vec { - const PREFETCH_AHEAD: usize = 4; let mut hashes = unchecked!(hashes); + const PREFETCH_AHEAD: usize = 4; macro_rules! entries { [$i:expr] => (hashes[$i] % (1 << self.map_mod_bits)) } - if hashes.len() >= PREFETCH_AHEAD { - for i in 1..PREFETCH_AHEAD - 1 { - prefetch_read_data!(&self.map[entries![i] as usize]); - } + macro_rules! prefetch_at { + ($i:expr) => {{ + if $i < hashes.len() { + prefetch_read_data!(&self.map[entries!($i) as usize]); + } + }}; } - for i in 0..hashes.len() { - if i + PREFETCH_AHEAD < hashes.len() { - prefetch_read_data!(&self.map[entries![i + PREFETCH_AHEAD] as usize]); - } + for i in 0..PREFETCH_AHEAD { + prefetch_at!(i); + } + for i in 0..hashes.len() { + prefetch_at!(i + PREFETCH_AHEAD); let mut e = entries![i] as usize; loop { let hash_matched = self.map[e].hashes.simd_eq(Simd::splat(hashes[i])); diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs index 77762a11a..414614f4f 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs @@ -209,7 +209,6 @@ fn execute_parquet_sink( mut input: SendableRecordBatchStream, exec_ctx: Arc, ) -> Result { - let partition_id = exec_ctx.partition_id(); let part_writer: Arc>> = Arc::default(); let bytes_written = exec_ctx.register_counter_metric("bytes_written"); @@ -218,20 +217,13 @@ fn execute_parquet_sink( .output_with_sender("ParquetSink", move |sender| async move { macro_rules! part_writer_init { ($batch:expr, $part_values:expr) => {{ - log::info!( - "[partition={partition_id}] starts writing partition: {:?}", - $part_values - ); + log::info!("starts writing partition: {:?}", $part_values); let parquet_sink_context_cloned = parquet_sink_context.clone(); *part_writer.lock() = Some({ // send identity batch, after that we can achieve a new output file sender.send(($batch.slice(0, 1))).await; tokio::task::spawn_blocking(move || { - PartWriter::try_new( - partition_id, - parquet_sink_context_cloned, - $part_values, - ) + PartWriter::try_new(parquet_sink_context_cloned, $part_values) }) .await .or_else(|e| df_execution_err!("closing parquet file error: {e}"))?? @@ -415,7 +407,6 @@ struct PartFileStat { } struct PartWriter { - partition_id: usize, path: String, parquet_sink_context: Arc, parquet_writer: ArrowWriter, @@ -426,21 +417,18 @@ struct PartWriter { impl PartWriter { fn try_new( - partition_id: usize, parquet_sink_context: Arc, part_values: &[ScalarValue], ) -> Result { if !part_values.is_empty() { - log::info!( - "[partition={partition_id}] starts outputting dynamic partition: {part_values:?}" - ); + log::info!("starts outputting dynamic partition: {part_values:?}"); } let part_file = jni_get_string!( jni_call_static!(BlazeNativeParquetSinkUtils.getTaskOutputPath() -> JObject)? .as_obj() .into() )?; - log::info!("[partition={partition_id}] starts writing parquet file: {part_file}"); + log::info!("starts writing parquet file: {part_file}"); let fs = parquet_sink_context.fs_provider.provide(&part_file)?; let bytes_written = Count::new(); @@ -453,7 +441,6 @@ impl PartWriter { Some(parquet_sink_context.props.clone()), )?; Ok(Self { - partition_id, path: part_file, parquet_sink_context, parquet_writer, @@ -473,7 +460,6 @@ impl PartWriter { } fn close(self) -> Result { - let partition_id = self.partition_id; let mut parquet_writer = self.parquet_writer; parquet_writer.flush()?; let rows_written = parquet_writer @@ -492,7 +478,7 @@ impl PartWriter { num_rows: rows_written, num_bytes: bytes_written, }; - log::info!("[partition={partition_id}] finished writing parquet file: {stat:?}"); + log::info!("finished writing parquet file: {stat:?}"); Ok(stat) } } diff --git a/native-engine/datafusion-ext-plans/src/project_exec.rs b/native-engine/datafusion-ext-plans/src/project_exec.rs index b38c52bed..bfd22eb75 100644 --- a/native-engine/datafusion-ext-plans/src/project_exec.rs +++ b/native-engine/datafusion-ext-plans/src/project_exec.rs @@ -28,7 +28,8 @@ use datafusion::{ PlanProperties, SendableRecordBatchStream, }, }; -use futures::{FutureExt, StreamExt}; +use datafusion_ext_commons::downcast_any; +use futures::StreamExt; use itertools::Itertools; use once_cell::sync::OnceCell; @@ -37,6 +38,7 @@ use crate::{ cached_exprs_evaluator::CachedExprsEvaluator, column_pruning::{prune_columns, ExecuteWithColumnPruning}, execution_context::ExecutionContext, + timer_helper::TimerHelper, }, filter_exec::FilterExec, }; @@ -136,20 +138,16 @@ impl ExecutionPlan for ProjectExec { let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); let exprs: Vec = self.expr.iter().map(|(e, _name)| e.clone()).collect(); - let fut = if let Some(filter_exec) = self.input.as_any().downcast_ref::() { + let output = if let Ok(filter_exec) = downcast_any!(self.input, FilterExec) { execute_project_with_filtering( filter_exec.children()[0].clone(), exec_ctx.clone(), filter_exec.predicates().to_vec(), exprs, - ) - .boxed() + )? } else { - execute_project_with_filtering(self.input.clone(), exec_ctx.clone(), vec![], exprs) - .boxed() + execute_project_with_filtering(self.input.clone(), exec_ctx.clone(), vec![], exprs)? }; - - let output = exec_ctx.build_output_stream(fut); Ok(exec_ctx.coalesce_with_default_batch_size(output)) } @@ -180,7 +178,7 @@ impl ExecuteWithColumnPruning for ProjectExec { } } -async fn execute_project_with_filtering( +fn execute_project_with_filtering( input: Arc, exec_ctx: Arc, filters: Vec, @@ -206,10 +204,15 @@ async fn execute_project_with_filtering( Ok(exec_ctx .clone() .output_with_sender("Project", move |sender| async move { - sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute()); - - while let Some(batch) = input.next().await.transpose()? { - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); + let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); + sender.exclude_time(&elapsed_compute); + + while let Some(batch) = elapsed_compute + .exclude_timer_async(input.next()) + .await + .transpose()? + { let output_batch = cached_expr_evaluator.filter_project(&batch)?; drop(batch); diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index 99f5ecab2..0f2efbe4b 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -83,11 +83,7 @@ impl BufferedData { // write buffered data to spill/target file, returns uncompressed size and // offsets to each partition pub fn write(self, mut w: W, partitioning: &Partitioning) -> Result> { - let partition_id = self.partition_id; - log::info!( - "[partition={partition_id}] draining all buffered data, total_mem={}", - self.mem_used() - ); + log::info!("draining all buffered data, total_mem={}", self.mem_used()); if self.num_rows == 0 { return Ok(vec![0; partitioning.partition_count() + 1]); @@ -117,7 +113,7 @@ impl BufferedData { } let compressed_size = offsets.last().cloned().unwrap_or_default(); - log::info!("[partition={partition_id}] all buffered data drained, compressed_size={compressed_size}"); + log::info!("all buffered data drained, compressed_size={compressed_size}"); Ok(offsets) } @@ -127,15 +123,13 @@ impl BufferedData { rss_partition_writer: GlobalRef, partitioning: &Partitioning, ) -> Result<()> { - let partition_id = self.partition_id; - log::info!( - "[partition={partition_id}] draining all buffered data to rss, total_mem={}", - self.mem_used() - ); - if self.num_rows == 0 { return Ok(()); } + log::info!( + "draining all buffered data to rss, total_mem={}", + self.mem_used() + ); let mut iter = self.into_sorted_batches(partitioning)?; let mut writer = IpcCompressionWriter::new(RssWriter::new(rss_partition_writer.clone(), 0)); @@ -154,8 +148,7 @@ impl BufferedData { writer.finish_current_buf()?; } jni_call!(BlazeRssPartitionWriterBase(rss_partition_writer.as_obj()).flush() -> ())?; - - log::info!("[partition={partition_id}] all buffered data drained to rss"); + log::info!("all buffered data drained to rss"); Ok(()) } diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index 81033c33b..f7ea80037 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -54,43 +54,43 @@ impl dyn ShuffleRepartitioner { let mut coalesced = exec_ctx.coalesce_with_default_batch_size(input); // process all input batches - Ok(exec_ctx.clone().output_with_sender("Shuffle", move |_| async move { - let batches_num_rows = AtomicUsize::default(); - let batches_mem_size = AtomicUsize::default(); - while let Some(batch) = coalesced.next().await.transpose()? { - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); - let batch_num_rows = batch.num_rows(); - let batch_mem_size = batch.get_array_mem_size(); - if batches_num_rows.load(SeqCst) == 0 { - log::info!( - "[partition={}] start shuffle writing, first batch num_rows={}, mem_size={}", - exec_ctx.partition_id(), - batch_num_rows, - ByteSize(batch_mem_size as u64), - ); + Ok(exec_ctx + .clone() + .output_with_sender("Shuffle", move |_| async move { + let batches_num_rows = AtomicUsize::default(); + let batches_mem_size = AtomicUsize::default(); + while let Some(batch) = coalesced.next().await.transpose()? { + let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); + let batch_num_rows = batch.num_rows(); + let batch_mem_size = batch.get_array_mem_size(); + if batches_num_rows.load(SeqCst) == 0 { + log::info!( + "start shuffle writing, first batch num_rows={}, mem_size={}", + batch_num_rows, + ByteSize(batch_mem_size as u64), + ); + } + batches_num_rows.fetch_add(batch_num_rows, SeqCst); + batches_mem_size.fetch_add(batch_mem_size, SeqCst); + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + self.insert_batch(batch) + .await + .map_err(|err| err.context("shuffle: executing insert_batch() error"))?; } - batches_num_rows.fetch_add(batch_num_rows, SeqCst); - batches_mem_size.fetch_add(batch_mem_size, SeqCst); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - self.insert_batch(batch) - .await - .map_err(|err| err.context("shuffle: executing insert_batch() error"))?; - } - data_size_counter.add(batches_mem_size.load(SeqCst)); + data_size_counter.add(batches_mem_size.load(SeqCst)); - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); - log::info!( - "[partition={}] finishing shuffle writing, num_rows={}, mem_size={}", - exec_ctx.partition_id(), - batches_num_rows.load(SeqCst), - ByteSize(batches_mem_size.load(SeqCst) as u64), - ); - self.shuffle_write() - .await - .map_err(|err| err.context("shuffle: executing shuffle_write() error"))?; - log::info!("[partition={}] finishing shuffle writing", exec_ctx.partition_id()); - Ok::<_, DataFusionError>(()) - })) + let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); + log::info!( + "finishing shuffle writing, num_rows={}, mem_size={}", + batches_num_rows.load(SeqCst), + ByteSize(batches_mem_size.load(SeqCst) as u64), + ); + self.shuffle_write() + .await + .map_err(|err| err.context("shuffle: executing shuffle_write() error"))?; + log::info!("finishing shuffle writing"); + Ok::<_, DataFusionError>(()) + })) } } diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index 531fdba2d..08cff689f 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -458,38 +458,42 @@ impl ExecuteWithColumnPruning for SortExec { context: Arc, projection: &[usize], ) -> Result { - let prune_sort_keys_from_batch = Arc::new(PruneSortKeysFromBatch::try_new( - self.input.schema(), - projection, - &self.exprs, - )?); let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); + let exprs = self.exprs.clone(); + let limit = self.fetch.unwrap_or(usize::MAX); + let input_schema = self.input.schema(); + let projection = projection.to_vec(); + let mut input = exec_ctx.execute_with_input_stats(&self.input)?; - let sorter = Arc::new(ExternalSorter { - exec_ctx: exec_ctx.clone(), - name: format!("ExternalSorter[partition={}]", partition), - mem_consumer_info: None, - prune_sort_keys_from_batch, - limit: self.fetch.unwrap_or(usize::MAX), - data: Default::default(), - spills: Default::default(), - num_total_rows: Default::default(), - mem_total_size: Default::default(), - }); - MemManager::register_consumer(sorter.clone(), true); - - let input = exec_ctx.execute_with_input_stats(&self.input)?; - let mut coalesced = exec_ctx.coalesce_with_default_batch_size(input); - - let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); + let exec_ctx_cloned = exec_ctx.clone(); let output = exec_ctx .clone() - .output_with_sender("Sort", |sender| async move { + .output_with_sender("Sort", move |sender| async move { + let elapsed_compute = exec_ctx_cloned.baseline_metrics().elapsed_compute().clone(); let _timer = elapsed_compute.timer(); sender.exclude_time(&elapsed_compute); + let prune_sort_keys_from_batch = Arc::new(PruneSortKeysFromBatch::try_new( + input_schema, + &projection, + &exprs, + )?); + + let sorter = Arc::new(ExternalSorter { + exec_ctx: exec_ctx_cloned.clone(), + name: format!("ExternalSorter[partition={}]", partition), + mem_consumer_info: None, + prune_sort_keys_from_batch, + limit, + data: Default::default(), + spills: Default::default(), + num_total_rows: Default::default(), + mem_total_size: Default::default(), + }); + MemManager::register_consumer(sorter.clone(), true); + while let Some(batch) = elapsed_compute - .exclude_timer_async(coalesced.next()) + .exclude_timer_async(input.next()) .await .transpose()? {