Skip to content

Commit

Permalink
Move AggregatedMetricsSet to metrics for further reuse (#1663)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Jan 24, 2022
1 parent 741df36 commit c63cfd4
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 128 deletions.
149 changes: 149 additions & 0 deletions datafusion/src/physical_plan/metrics/aggregated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metrics common for complex operators with multiple steps.
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
};
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone)]
/// Aggregates all metrics during a complex operation, which is composed of multiple steps and
/// each stage reports its statistics separately.
/// Give sort as an example, when the dataset is more significant than available memory, it will report
/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`.
/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation),
/// and which are intermediate metrics that we only account for elapsed_compute time.
pub struct AggregatedMetricsSet {
intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
}

impl AggregatedMetricsSet {
/// Create a new aggregated set
pub(crate) fn new() -> Self {
Self {
intermediate: Arc::new(std::sync::Mutex::new(vec![])),
final_: Arc::new(std::sync::Mutex::new(vec![])),
}
}

/// create a new intermediate baseline
pub(crate) fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.intermediate.lock().unwrap().push(ms);
result
}

/// create a new final baseline
pub(crate) fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.final_.lock().unwrap().push(ms);
result
}

fn merge_compute_time(&self, dest: &Time) {
let time1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| {
es.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64)
})
.sum();
let time2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| {
es.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64)
})
.sum();
dest.add_duration(Duration::from_nanos(time1));
dest.add_duration(Duration::from_nanos(time2));
}

fn merge_spill_count(&self, dest: &Count) {
let count1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
.sum();
let count2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
.sum();
dest.add(count1);
dest.add(count2);
}

fn merge_spilled_bytes(&self, dest: &Count) {
let count1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
.sum();
let count2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
.sum();
dest.add(count1);
dest.add(count2);
}

fn merge_output_count(&self, dest: &Count) {
let count = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
.sum();
dest.add(count);
}

/// Aggregate all metrics into a one
pub(crate) fn aggregate_all(&self) -> MetricsSet {
let metrics = ExecutionPlanMetricsSet::new();
let baseline = BaselineMetrics::new(&metrics, 0);
self.merge_compute_time(baseline.elapsed_compute());
self.merge_spill_count(baseline.spill_count());
self.merge_spilled_bytes(baseline.spilled_bytes());
self.merge_output_count(baseline.output_rows());
metrics.clone_inner()
}
}
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Metrics for recording information about execution
mod aggregated;
mod baseline;
mod builder;
mod value;
Expand All @@ -30,6 +31,7 @@ use std::{
use hashbrown::HashMap;

// public exports
pub use aggregated::AggregatedMetricsSet;
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
Expand Down
130 changes: 2 additions & 128 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use crate::execution::memory_manager::{
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
};
use crate::physical_plan::metrics::{AggregatedMetricsSet, BaselineMetrics, MetricsSet};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
use crate::physical_plan::sorts::SortedStream;
use crate::physical_plan::stream::RecordBatchReceiverStream;
Expand All @@ -54,7 +52,6 @@ use std::fs::File;
use std::io::BufReader;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
use tokio::task;

Expand Down Expand Up @@ -365,121 +362,6 @@ pub struct SortExec {
preserve_partitioning: bool,
}

#[derive(Debug, Clone)]
/// Aggregates all metrics during a complex operation, which is composed of multiple steps and
/// each stage reports its statistics separately.
/// Give sort as an example, when the dataset is more significant than available memory, it will report
/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`.
/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation),
/// and which are intermediate metrics that we only account for elapsed_compute time.
struct AggregatedMetricsSet {
intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
}

impl AggregatedMetricsSet {
fn new() -> Self {
Self {
intermediate: Arc::new(std::sync::Mutex::new(vec![])),
final_: Arc::new(std::sync::Mutex::new(vec![])),
}
}

fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.intermediate.lock().unwrap().push(ms);
result
}

fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.final_.lock().unwrap().push(ms);
result
}

/// We should accumulate all times from all steps' reports for the total time consumption.
fn merge_compute_time(&self, dest: &Time) {
let time1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| {
es.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64)
})
.sum();
let time2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| {
es.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64)
})
.sum();
dest.add_duration(Duration::from_nanos(time1));
dest.add_duration(Duration::from_nanos(time2));
}

/// We should accumulate all count from all steps' reports for the total spill count.
fn merge_spill_count(&self, dest: &Count) {
let count1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
.sum();
let count2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
.sum();
dest.add(count1);
dest.add(count2);
}

/// We should accumulate all spilled bytes from all steps' reports for the total spilled bytes.
fn merge_spilled_bytes(&self, dest: &Count) {
let count1 = self
.intermediate
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
.sum();
let count2 = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
.sum();
dest.add(count1);
dest.add(count2);
}

/// We should only care about output from the final stage metrics.
fn merge_output_count(&self, dest: &Count) {
let count = self
.final_
.lock()
.unwrap()
.iter()
.map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
.sum();
dest.add(count);
}
}

impl SortExec {
/// Create a new sort execution plan
pub fn try_new(
Expand Down Expand Up @@ -595,15 +477,7 @@ impl ExecutionPlan for SortExec {
}

fn metrics(&self) -> Option<MetricsSet> {
let metrics = ExecutionPlanMetricsSet::new();
let baseline = BaselineMetrics::new(&metrics, 0);
self.all_metrics
.merge_compute_time(baseline.elapsed_compute());
self.all_metrics.merge_spill_count(baseline.spill_count());
self.all_metrics
.merge_spilled_bytes(baseline.spilled_bytes());
self.all_metrics.merge_output_count(baseline.output_rows());
Some(metrics.clone_inner())
Some(self.all_metrics.aggregate_all())
}

fn fmt_as(
Expand Down

0 comments on commit c63cfd4

Please sign in to comment.