From afaf21bebf52bca2762ad4aa66ef4f0fa5d65f8a Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 22 Oct 2024 14:36:59 +0200 Subject: [PATCH 1/5] trace-server: add load speed and total time --- .../turbopack-trace-server/src/reader/mod.rs | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/turbopack/crates/turbopack-trace-server/src/reader/mod.rs b/turbopack/crates/turbopack-trace-server/src/reader/mod.rs index d19591ab12436..61c0f7941c762 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/mod.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/mod.rs @@ -9,7 +9,7 @@ use std::{ path::PathBuf, sync::Arc, thread::{self, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Result; @@ -131,7 +131,10 @@ impl TraceReader { let mut format: Option> = None; let mut current_read = 0; - let mut initial_read = { file.seek(SeekFrom::End(0)).ok() }; + let mut initial_read = file + .seek(SeekFrom::End(0)) + .ok() + .map(|total| (total, Instant::now())); if file.seek(SeekFrom::Start(0)).is_err() { return false; } @@ -193,7 +196,7 @@ impl TraceReader { } let prev_read = current_read; current_read += bytes_read as u64; - if let Some(total) = &mut initial_read { + if let Some((total, start)) = &mut initial_read { let old_mbs = prev_read / (97 * 1024 * 1024); let new_mbs = current_read / (97 * 1024 * 1024); if old_mbs != new_mbs { @@ -207,7 +210,13 @@ impl TraceReader { let uncompressed = current_read / (1024 * 1024); let total = *total / (1024 * 1024); let stats = format.stats(); - print!("{}% read ({}/{} MB)", percentage, read, total); + print!( + "{}% read ({}/{} MB, {} MB/s)", + percentage, + read, + total, + read * 1000 / (start.elapsed().as_millis() + 1) as u64 + ); if uncompressed != read { print!(" ({} MB uncompressed)", uncompressed); } @@ -249,19 +258,23 @@ impl TraceReader { fn wait_for_more_data( &mut self, file: &mut TraceFile, - initial_read: &mut Option, + initial_read: &mut Option<(u64, Instant)>, format: Option<&dyn TraceFormat>, ) -> Option { let Ok(pos) = file.stream_position() else { return Some(true); }; - if let Some(total) = initial_read.take() { + if let Some((total, start)) = initial_read.take() { if let Some(format) = format { let stats = format.stats(); println!("{}", stats); } if total > MIN_INITIAL_REPORT_SIZE { - println!("Initial read completed ({} MB)", total / (1024 * 1024)); + println!( + "Initial read completed ({} MB, {}s)", + total / (1024 * 1024), + (start.elapsed().as_millis() / 100) as f32 / 10.0 + ); } } loop { From 865e07ab7996b01b7cad7861957ae730f80f5ca1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 22 Oct 2024 14:38:09 +0200 Subject: [PATCH 2/5] trace-server: reuse buffer while loading --- .../src/reader/heaptrack.rs | 4 +- .../turbopack-trace-server/src/reader/mod.rs | 90 +++++++++++++++---- .../src/reader/nextjs.rs | 4 +- .../src/reader/turbopack.rs | 37 +++++++- 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/turbopack/crates/turbopack-trace-server/src/reader/heaptrack.rs b/turbopack/crates/turbopack-trace-server/src/reader/heaptrack.rs index cfeae1ea40670..9d732ebcff205 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/heaptrack.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/heaptrack.rs @@ -181,7 +181,9 @@ impl TraceFormat for HeaptrackFormat { ) } - fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result { + type Reused = (); + + fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result { let mut bytes_read = 0; let mut outdated_spans = HashSet::new(); let mut store = self.store.write(); diff --git a/turbopack/crates/turbopack-trace-server/src/reader/mod.rs b/turbopack/crates/turbopack-trace-server/src/reader/mod.rs index 61c0f7941c762..e6d13f7cc6a6d 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/mod.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/mod.rs @@ -3,6 +3,7 @@ mod nextjs; mod turbopack; use std::{ + any::Any, env, fs::File, io::{self, BufReader, Read, Seek, SeekFrom}, @@ -23,12 +24,55 @@ use crate::{ const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024; trait TraceFormat { - fn read(&mut self, buffer: &[u8]) -> Result; + type Reused: Default; + fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result; fn stats(&self) -> String { String::new() } } +type ErasedReused = Box; + +struct ErasedTraceFormat(Box); + +trait ObjectSafeTraceFormat { + fn create_reused(&self) -> ErasedReused; + fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result; + fn stats(&self) -> String; +} + +impl ObjectSafeTraceFormat for T +where + T::Reused: 'static, +{ + fn create_reused(&self) -> ErasedReused { + Box::new(T::Reused::default()) + } + + fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result { + let reuse = reuse.downcast_mut().expect("Type of reuse is invalid"); + TraceFormat::read(self, buffer, reuse) + } + + fn stats(&self) -> String { + TraceFormat::stats(self) + } +} + +impl ObjectSafeTraceFormat for ErasedTraceFormat { + fn create_reused(&self) -> ErasedReused { + self.0.create_reused() + } + + fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result { + self.0.read(buffer, reuse) + } + + fn stats(&self) -> String { + self.0.stats() + } +} + #[derive(Default)] enum TraceFile { Raw(File), @@ -128,7 +172,7 @@ impl TraceReader { store.reset(); } - let mut format: Option> = None; + let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None; let mut current_read = 0; let mut initial_read = file @@ -154,9 +198,11 @@ impl TraceReader { match file.read(&mut chunk) { Ok(bytes_read) => { if bytes_read == 0 { - if let Some(value) = - self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref()) - { + if let Some(value) = self.wait_for_more_data( + &mut file, + &mut initial_read, + format.as_ref().map(|(f, _)| f), + ) { return value; } } else { @@ -168,21 +214,29 @@ impl TraceReader { } buffer.extend_from_slice(&chunk[..bytes_read]); if format.is_none() && buffer.len() >= 8 { - if buffer.starts_with(b"TRACEv0") { + let erased_format = if buffer.starts_with(b"TRACEv0") { index = 7; - format = Some(Box::new(TurbopackFormat::new(self.store.clone()))); + ErasedTraceFormat(Box::new(TurbopackFormat::new( + self.store.clone(), + ))) } else if buffer.starts_with(b"[{\"name\"") { - format = Some(Box::new(NextJsFormat::new(self.store.clone()))); + ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone()))) } else if buffer.starts_with(b"v ") { - format = Some(Box::new(HeaptrackFormat::new(self.store.clone()))) + ErasedTraceFormat(Box::new(HeaptrackFormat::new( + self.store.clone(), + ))) } else { // Fallback to the format without magic bytes // TODO Remove this after a while and show an error instead - format = Some(Box::new(TurbopackFormat::new(self.store.clone()))); - } + ErasedTraceFormat(Box::new(TurbopackFormat::new( + self.store.clone(), + ))) + }; + let reuse = erased_format.create_reused(); + format = Some((erased_format, reuse)); } - if let Some(format) = &mut format { - match format.read(&buffer[index..]) { + if let Some((format, reuse)) = &mut format { + match format.read(&buffer[index..], reuse) { Ok(bytes_read) => { index += bytes_read; } @@ -240,9 +294,11 @@ impl TraceReader { } Err(err) => { if err.kind() == io::ErrorKind::UnexpectedEof { - if let Some(value) = - self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref()) - { + if let Some(value) = self.wait_for_more_data( + &mut file, + &mut initial_read, + format.as_ref().map(|(f, _)| f), + ) { return value; } } else { @@ -259,7 +315,7 @@ impl TraceReader { &mut self, file: &mut TraceFile, initial_read: &mut Option<(u64, Instant)>, - format: Option<&dyn TraceFormat>, + format: Option<&ErasedTraceFormat>, ) -> Option { let Ok(pos) = file.stream_position() else { return Some(true); diff --git a/turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs b/turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs index 35d039d2446da..6b0d7535fe358 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs @@ -27,7 +27,9 @@ impl NextJsFormat { } impl TraceFormat for NextJsFormat { - fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result { + type Reused = (); + + fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result { let mut bytes_read = 0; let mut outdated_spans = HashSet::new(); loop { diff --git a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs index 4f64a1fa60cb5..8b10a9a887075 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs @@ -1,5 +1,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, + mem::transmute, + ops::{Deref, DerefMut}, sync::Arc, }; @@ -294,8 +296,15 @@ impl TurbopackFormat { } impl TraceFormat for TurbopackFormat { - fn read(&mut self, mut buffer: &[u8]) -> Result { - let mut rows = Vec::new(); + type Reused = Vec>; + + fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result { + reuse.clear(); + let mut reuse = ClearOnDrop(reuse); + // Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the + // lifetime of data, since there is no data and data can't leave this function. + let rows = + unsafe { transmute::<&mut Vec>, &mut Vec>>(&mut *reuse) }; let mut bytes_read = 0; loop { match postcard::take_from_bytes(buffer) { @@ -314,7 +323,7 @@ impl TraceFormat for TurbopackFormat { } if !rows.is_empty() { let store = self.store.clone(); - let mut iter = rows.into_iter(); + let mut iter = rows.drain(..); { let mut store = store.write(); for row in iter.by_ref() { @@ -327,3 +336,25 @@ impl TraceFormat for TurbopackFormat { Ok(bytes_read) } } + +struct ClearOnDrop<'l, T>(&'l mut Vec); + +impl Drop for ClearOnDrop<'_, T> { + fn drop(&mut self) { + self.0.clear(); + } +} + +impl Deref for ClearOnDrop<'_, T> { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl DerefMut for ClearOnDrop<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} From 7800611f611d53bb8f77fd4bff2997ab8fdf4dd4 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 22 Oct 2024 17:51:43 +0200 Subject: [PATCH 3/5] trace-server: performance with parallelization --- .../src/self_time_tree.rs | 184 +++++++++++++----- .../src/span_graph_ref.rs | 119 +++++++---- .../turbopack-trace-server/src/span_ref.rs | 45 +++-- 3 files changed, 240 insertions(+), 108 deletions(-) diff --git a/turbopack/crates/turbopack-trace-server/src/self_time_tree.rs b/turbopack/crates/turbopack-trace-server/src/self_time_tree.rs index f795d5b52cdd3..548cb4208cebb 100644 --- a/turbopack/crates/turbopack-trace-server/src/self_time_tree.rs +++ b/turbopack/crates/turbopack-trace-server/src/self_time_tree.rs @@ -1,9 +1,11 @@ use std::{ cmp::{max, min}, - mem::{replace, take}, + mem::take, }; const SPLIT_COUNT: usize = 128; +/// Start balancing the tree when there are N times more items on one side. Must be at least 3. +const BALANCE_THRESHOLD: usize = 3; pub struct SelfTimeTree { entries: Vec>, @@ -23,6 +25,8 @@ struct SelfTimeChildren { split_point: u64, /// Entries >= split_point right: SelfTimeTree, + /// Number of entries in the SelfTimeTree::entries list that overlap the split point + spanning_entries: usize, } impl Default for SelfTimeTree { @@ -46,39 +50,58 @@ impl SelfTimeTree { pub fn insert(&mut self, start: u64, end: u64, item: T) { self.count += 1; - if let Some(children) = &mut self.children { - if end <= children.split_point { - children.left.insert(start, end, item); - } else if start >= children.split_point { - children.right.insert(start, end, item); + self.entries.push(SelfTimeEntry { start, end, item }); + self.check_for_split(); + } + + fn check_for_split(&mut self) { + if self.entries.len() >= SPLIT_COUNT { + let spanning_entries = if let Some(children) = &mut self.children { + children.spanning_entries } else { - self.entries.push(SelfTimeEntry { start, end, item }); - } - self.rebalance(); - } else { - self.entries.push(SelfTimeEntry { start, end, item }); - if self.entries.len() >= SPLIT_COUNT { + 0 + }; + if self.entries.len() - spanning_entries >= SPLIT_COUNT { self.split(); } } } fn split(&mut self) { - if self.entries.is_empty() { - return; + debug_assert!(!self.entries.is_empty()); + self.distribute_entries(); + self.rebalance(); + } + + fn distribute_entries(&mut self) { + if self.children.is_none() { + let start = self.entries.iter().min_by_key(|e| e.start).unwrap().start; + let end = self.entries.iter().max_by_key(|e| e.end).unwrap().end; + let middle = (start + end) / 2; + self.children = Some(Box::new(SelfTimeChildren { + left: SelfTimeTree::new(), + split_point: middle, + right: SelfTimeTree::new(), + spanning_entries: 0, + })); } - let entries = take(&mut self.entries); - let start = entries.iter().min_by_key(|e| e.start).unwrap().start; - let end = entries.iter().max_by_key(|e| e.end).unwrap().end; - let middle = (start + end) / 2; - self.children = Some(Box::new(SelfTimeChildren { - left: SelfTimeTree::new(), - split_point: middle, - right: SelfTimeTree::new(), - })); - self.count = 0; - for entry in entries { - self.insert(entry.start, entry.end, entry.item); + let Some(children) = &mut self.children else { + unreachable!(); + }; + let mut i = children.spanning_entries; + while i < self.entries.len() { + let SelfTimeEntry { start, end, .. } = self.entries[i]; + if end <= children.split_point { + let SelfTimeEntry { start, end, item } = self.entries.swap_remove(i); + children.left.insert(start, end, item); + } else if start >= children.split_point { + let SelfTimeEntry { start, end, item } = self.entries.swap_remove(i); + children.right.insert(start, end, item); + } else { + self.entries.swap(i, children.spanning_entries); + children.spanning_entries += 1; + i += 1; + } } } @@ -87,6 +110,7 @@ impl SelfTimeTree { left, split_point, right, + spanning_entries, }) = &mut self.children { let SelfTimeTree { @@ -99,55 +123,65 @@ impl SelfTimeTree { children: right_children, entries: right_entries, } = right; - if *left_count > *right_count * 3 + left_entries.len() { + if *left_count > *right_count * BALANCE_THRESHOLD + *spanning_entries { + // The left side has overweight + // We want to have a new tree that is: + // left' = left.left + // right' = (left.right, right) with self.split_point + // split_point' = left.split_point + // direct entries in self and left are put in self and are redistributed if let Some(box SelfTimeChildren { left: left_left, split_point: left_split_point, right: left_right, + spanning_entries: _, }) = left_children { - let left_entries = take(left_entries); *right = Self { - count: left_right.count + right.count + self.entries.len(), - entries: take(&mut self.entries), + count: left_right.count + right.count, + entries: Vec::new(), children: Some(Box::new(SelfTimeChildren { left: take(left_right), split_point: *split_point, right: take(right), + spanning_entries: 0, })), }; *split_point = *left_split_point; + self.entries.append(left_entries); *left = take(left_left); - let entries = replace(&mut self.entries, left_entries); - self.count = left.count + right.count + self.entries.len(); - for SelfTimeEntry { start, end, item } in entries { - self.insert(start, end, item); - } + *spanning_entries = 0; + self.distribute_entries(); } - } else if *right_count > *left_count * 3 + right_entries.len() { + } else if *right_count > *left_count * BALANCE_THRESHOLD + *spanning_entries { + // The right side has overweight + // We want to have a new tree that is: + // left' = (left, right.left) with self.split_point + // right' = right.right + // split_point' = right.split_point + // direct entries in self and right are put in self and are redistributed if let Some(box SelfTimeChildren { left: right_left, split_point: right_split_point, right: right_right, + spanning_entries: _, }) = right_children { - let right_entries = take(right_entries); *left = Self { - count: left.count + right_left.count + self.entries.len(), - entries: take(&mut self.entries), + count: left.count + right_left.count, + entries: Vec::new(), children: Some(Box::new(SelfTimeChildren { left: take(left), split_point: *split_point, right: take(right_left), + spanning_entries: 0, })), }; *split_point = *right_split_point; + self.entries.append(right_entries); *right = take(right_right); - let entries = replace(&mut self.entries, right_entries); - self.count = left.count + right.count + self.entries.len(); - for SelfTimeEntry { start, end, item } in entries { - self.insert(start, end, item); - } + *spanning_entries = 0; + self.check_for_split(); } } } @@ -202,10 +236,11 @@ mod tests { fn print_tree(tree: &SelfTimeTree, indent: usize) { if let Some(children) = &tree.children { println!( - "{}{} items (split at {}, {} total)", + "{}{} items (split at {}, {} overlapping, {} total)", " ".repeat(indent), tree.entries.len(), children.split_point, + children.spanning_entries, tree.count ); print_tree(&children.left, indent + 2); @@ -220,33 +255,80 @@ mod tests { } } + fn assert_balanced(tree: &SelfTimeTree) { + if let Some(children) = &tree.children { + let l = children.left.count; + let r = children.right.count; + let s = children.spanning_entries; + if (l > SPLIT_COUNT || r > SPLIT_COUNT) + && ((l > r * BALANCE_THRESHOLD + s) || (r > l * BALANCE_THRESHOLD + s)) + { + print_tree(tree, 0); + panic!("Tree is not balanced"); + } + assert_balanced(&children.left); + assert_balanced(&children.right); + } + } + #[test] fn test_simple() { let mut tree = SelfTimeTree::new(); - for i in 0..1000 { + let count = 10000; + for i in 0..count { tree.insert(i, i + 1, i); + assert_eq!(tree.count, (i + 1) as usize); + assert_balanced(&tree); + } + assert_eq!(tree.lookup_range_count(0, count), count); + print_tree(&tree, 0); + assert_balanced(&tree); + } + + #[test] + fn test_evenly() { + let mut tree = SelfTimeTree::new(); + let count = 10000; + for a in 0..10 { + for b in 0..10 { + for c in 0..10 { + for d in 0..10 { + let i = d * 1000 + c * 100 + b * 10 + a; + tree.insert(i, i + 1, i); + assert_balanced(&tree); + } + } + } } - assert_eq!(tree.lookup_range_count(0, 1000), 1000); + assert_eq!(tree.lookup_range_count(0, count), count); print_tree(&tree, 0); + assert_balanced(&tree); } #[test] fn test_overlapping() { let mut tree = SelfTimeTree::new(); - for i in 0..1000 { + let count = 10000; + for i in 0..count { tree.insert(i, i + 100, i); + assert_eq!(tree.count, (i + 1) as usize); + assert_balanced(&tree); } - assert_eq!(tree.lookup_range_count(0, 1100), 1000 * 100); + assert_eq!(tree.lookup_range_count(0, count + 100), count * 100); print_tree(&tree, 0); + assert_balanced(&tree); } #[test] fn test_overlapping_heavy() { let mut tree = SelfTimeTree::new(); - for i in 0..1000 { + let count = 10000; + for i in 0..count { tree.insert(i, i + 500, i); + assert_eq!(tree.count, (i + 1) as usize); } - assert_eq!(tree.lookup_range_count(0, 2000), 1000 * 500); + assert_eq!(tree.lookup_range_count(0, count + 500), count * 500); print_tree(&tree, 0); + assert_balanced(&tree); } } diff --git a/turbopack/crates/turbopack-trace-server/src/span_graph_ref.rs b/turbopack/crates/turbopack-trace-server/src/span_graph_ref.rs index 61aac539cb16c..4a514d7a2844d 100644 --- a/turbopack/crates/turbopack-trace-server/src/span_graph_ref.rs +++ b/turbopack/crates/turbopack-trace-server/src/span_graph_ref.rs @@ -4,6 +4,8 @@ use std::{ sync::{Arc, OnceLock}, }; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; + use crate::{ bottom_up::build_bottom_up_graph, span::{SpanGraph, SpanGraphEvent, SpanIndex}, @@ -65,52 +67,78 @@ impl<'a> SpanGraphRef<'a> { }) } - pub fn events(&self) -> impl DoubleEndedIterator> + '_ { + fn recursive_spans_par(&self) -> impl ParallelIterator> + '_ { self.graph - .events - .get_or_init(|| { - if self.count() == 1 { - let _ = self.first_span().graph(); - self.first_span().extra().graph.get().unwrap().clone() - } else { - let self_group = self.first_span().group_name(); - let mut map: FxIndexMap<&str, (Vec, Vec)> = - FxIndexMap::default(); - let mut queue = VecDeque::with_capacity(8); - for span in self.recursive_spans() { - for span in span.children() { - let name = span.group_name(); - if name != self_group { - let (list, recusive_list) = map.entry(name).or_default(); - list.push(span.index()); - queue.push_back(span); - while let Some(child) = queue.pop_front() { - for nested_child in child.children() { - let nested_name = nested_child.group_name(); - if name == nested_name { - recusive_list.push(nested_child.index()); - queue.push_back(nested_child); - } + .root_spans + .par_iter() + .chain(self.graph.recursive_spans.par_iter()) + .map(move |span| SpanRef { + span: &self.store.spans[span.get()], + store: self.store, + index: span.get(), + }) + } + + fn events_vec_ref(&self) -> &Vec { + self.graph.events.get_or_init(|| { + if self.count() == 1 { + let _ = self.first_span().graph(); + self.first_span().extra().graph.get().unwrap().clone() + } else { + let self_group = self.first_span().group_name(); + let mut map: FxIndexMap<&str, (Vec, Vec)> = + FxIndexMap::default(); + let mut queue = VecDeque::with_capacity(8); + for span in self.recursive_spans() { + for span in span.children() { + let name = span.group_name(); + if name != self_group { + let (list, recusive_list) = map.entry(name).or_default(); + list.push(span.index()); + queue.push_back(span); + while let Some(child) = queue.pop_front() { + for nested_child in child.children() { + let nested_name = nested_child.group_name(); + if name == nested_name { + recusive_list.push(nested_child.index()); + queue.push_back(nested_child); } } } } } - event_map_to_list(map) } - }) - .iter() - .map(|graph| match graph { - SpanGraphEvent::SelfTime { duration } => SpanGraphEventRef::SelfTime { - duration: *duration, + event_map_to_list(map) + } + }) + } + + pub fn events(&self) -> impl DoubleEndedIterator> + '_ { + self.events_vec_ref().iter().map(|graph| match graph { + SpanGraphEvent::SelfTime { duration } => SpanGraphEventRef::SelfTime { + duration: *duration, + }, + SpanGraphEvent::Child { child } => SpanGraphEventRef::Child { + graph: SpanGraphRef { + graph: child.clone(), + store: self.store, }, - SpanGraphEvent::Child { child } => SpanGraphEventRef::Child { - graph: SpanGraphRef { - graph: child.clone(), - store: self.store, - }, + }, + }) + } + + pub fn events_par(&self) -> impl ParallelIterator> + '_ { + self.events_vec_ref().par_iter().map(|graph| match graph { + SpanGraphEvent::SelfTime { duration } => SpanGraphEventRef::SelfTime { + duration: *duration, + }, + SpanGraphEvent::Child { child } => SpanGraphEventRef::Child { + graph: SpanGraphRef { + graph: child.clone(), + store: self.store, }, - }) + }, + }) } pub fn children(&self) -> impl DoubleEndedIterator> + '_ { @@ -120,6 +148,13 @@ impl<'a> SpanGraphRef<'a> { }) } + pub fn children_par(&self) -> impl ParallelIterator> + '_ { + self.events_par().filter_map(|event| match event { + SpanGraphEventRef::SelfTime { .. } => None, + SpanGraphEventRef::Child { graph: span } => Some(span), + }) + } + pub fn bottom_up(&self) -> impl Iterator> + '_ { self.graph .bottom_up @@ -251,19 +286,17 @@ impl<'a> SpanGraphRef<'a> { pub fn corrected_self_time(&self) -> u64 { *self.graph.corrected_self_time.get_or_init(|| { - self.recursive_spans() + self.recursive_spans_par() .map(|span| span.corrected_self_time()) - .reduce(|a, b| a + b) - .unwrap_or_default() + .sum::() }) } pub fn corrected_total_time(&self) -> u64 { *self.graph.corrected_total_time.get_or_init(|| { - self.children() + self.children_par() .map(|graph| graph.corrected_total_time()) - .reduce(|a, b| a + b) - .unwrap_or_default() + .sum::() + self.corrected_self_time() }) } diff --git a/turbopack/crates/turbopack-trace-server/src/span_ref.rs b/turbopack/crates/turbopack-trace-server/src/span_ref.rs index b0f3e71a70e81..87d0c4d3e88b1 100644 --- a/turbopack/crates/turbopack-trace-server/src/span_ref.rs +++ b/turbopack/crates/turbopack-trace-server/src/span_ref.rs @@ -5,6 +5,8 @@ use std::{ vec, }; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; + use crate::{ bottom_up::build_bottom_up_graph, span::{Span, SpanEvent, SpanExtra, SpanGraphEvent, SpanIndex, SpanNames, SpanTimeData}, @@ -193,6 +195,17 @@ impl<'a> SpanRef<'a> { }) } + pub fn children_par(&self) -> impl ParallelIterator> + 'a { + self.span.events.par_iter().filter_map(|event| match event { + SpanEvent::SelfTime { .. } => None, + SpanEvent::Child { index } => Some(SpanRef { + span: &self.store.spans[index.get()], + store: self.store, + index: index.get(), + }), + }) + } + pub fn total_time(&self) -> u64 { *self.time_data().total_time.get_or_init(|| { self.children() @@ -256,18 +269,23 @@ impl<'a> SpanRef<'a> { pub fn corrected_self_time(&self) -> u64 { let store = self.store; *self.time_data().corrected_self_time.get_or_init(|| { - let mut self_time = 0; - for event in self.span.events.iter() { - if let SpanEvent::SelfTime { start, end } = event { - let duration = end - start; - if duration == 0 { - continue; + let mut self_time = self + .span + .events + .par_iter() + .filter_map(|event| { + if let SpanEvent::SelfTime { start, end } = event { + let duration = end - start; + if duration != 0 { + store.set_max_self_time_lookup(*end); + let concurrent_time = + store.self_time_tree.lookup_range_count(*start, *end); + return Some(duration * duration / concurrent_time); + } } - store.set_max_self_time_lookup(*end); - let concurrent_time = store.self_time_tree.lookup_range_count(*start, *end); - self_time += duration * duration / concurrent_time; - } - } + None + }) + .sum(); if self.children().next().is_none() { self_time = max(self_time, 1); } @@ -277,10 +295,9 @@ impl<'a> SpanRef<'a> { pub fn corrected_total_time(&self) -> u64 { *self.time_data().corrected_total_time.get_or_init(|| { - self.children() + self.children_par() .map(|child| child.corrected_total_time()) - .reduce(|a, b| a + b) - .unwrap_or_default() + .sum::() + self.corrected_self_time() }) } From 463cbf04b1e4c525b6a037147898552203ab0ce3 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 22 Oct 2024 18:00:29 +0200 Subject: [PATCH 4/5] trace-server: allow to disable corrected time for performance reasons --- .../turbopack-trace-server/src/span_ref.rs | 6 +++-- .../turbopack-trace-server/src/store.rs | 25 +++++++++++++------ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/turbopack/crates/turbopack-trace-server/src/span_ref.rs b/turbopack/crates/turbopack-trace-server/src/span_ref.rs index 87d0c4d3e88b1..e934e216ede91 100644 --- a/turbopack/crates/turbopack-trace-server/src/span_ref.rs +++ b/turbopack/crates/turbopack-trace-server/src/span_ref.rs @@ -278,8 +278,10 @@ impl<'a> SpanRef<'a> { let duration = end - start; if duration != 0 { store.set_max_self_time_lookup(*end); - let concurrent_time = - store.self_time_tree.lookup_range_count(*start, *end); + let concurrent_time = store + .self_time_tree + .as_ref() + .map_or(duration, |tree| tree.lookup_range_count(*start, *end)); return Some(duration * duration / concurrent_time); } } diff --git a/turbopack/crates/turbopack-trace-server/src/store.rs b/turbopack/crates/turbopack-trace-server/src/store.rs index 73997782e582f..67514aa3ad6b8 100644 --- a/turbopack/crates/turbopack-trace-server/src/store.rs +++ b/turbopack/crates/turbopack-trace-server/src/store.rs @@ -1,6 +1,7 @@ use std::{ cmp::{max, min}, collections::HashSet, + env, mem::replace, num::NonZeroUsize, sync::{atomic::AtomicU64, OnceLock}, @@ -18,7 +19,7 @@ const CUT_OFF_DEPTH: u32 = 150; pub struct Store { pub(crate) spans: Vec, - pub(crate) self_time_tree: SelfTimeTree, + pub(crate) self_time_tree: Option>, max_self_time_lookup_time: AtomicU64, } @@ -52,7 +53,10 @@ impl Store { pub fn new() -> Self { Self { spans: vec![new_root_span()], - self_time_tree: SelfTimeTree::new(), + self_time_tree: env::var("NO_CORRECTED_TIME") + .ok() + .is_none() + .then(SelfTimeTree::new), max_self_time_lookup_time: AtomicU64::new(0), } } @@ -60,12 +64,16 @@ impl Store { pub fn reset(&mut self) { self.spans.truncate(1); self.spans[0] = new_root_span(); - self.self_time_tree = SelfTimeTree::new(); + if let Some(tree) = self.self_time_tree.as_mut() { + *tree = SelfTimeTree::new(); + } *self.max_self_time_lookup_time.get_mut() = 0; } pub fn has_time_info(&self) -> bool { - self.self_time_tree.len() > 0 + self.self_time_tree + .as_ref() + .map_or(true, |tree| tree.len() > 0) } pub fn add_span( @@ -152,13 +160,14 @@ impl Store { span_index: SpanIndex, outdated_spans: &mut HashSet, ) { - if *self.max_self_time_lookup_time.get_mut() >= start { - self.self_time_tree - .for_each_in_range(start, end, |_, _, span| { + if let Some(tree) = self.self_time_tree.as_mut() { + if *self.max_self_time_lookup_time.get_mut() >= start { + tree.for_each_in_range(start, end, |_, _, span| { outdated_spans.insert(*span); }); + } + tree.insert(start, end, span_index); } - self.self_time_tree.insert(start, end, span_index); } pub fn add_self_time( From c6767d03f2ab5e85b8b3a9e36bd5cc3f8d9794eb Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 22 Oct 2024 18:18:45 +0200 Subject: [PATCH 5/5] trace-server: parallize graph creation --- .../turbopack-trace-server/src/span_ref.rs | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/turbopack/crates/turbopack-trace-server/src/span_ref.rs b/turbopack/crates/turbopack-trace-server/src/span_ref.rs index e934e216ede91..c4051592a1756 100644 --- a/turbopack/crates/turbopack-trace-server/src/span_ref.rs +++ b/turbopack/crates/turbopack-trace-server/src/span_ref.rs @@ -317,23 +317,46 @@ impl<'a> SpanRef<'a> { self.extra() .graph .get_or_init(|| { - let mut map: FxIndexMap<&str, (Vec, Vec)> = - FxIndexMap::default(); - let mut queue = VecDeque::with_capacity(8); - for child in self.children() { - let name = child.group_name(); - let (list, recursive_list) = map.entry(name).or_default(); - list.push(child.index()); - queue.push_back(child); - while let Some(child) = queue.pop_front() { - for nested_child in child.children() { + struct Entry<'a> { + span: SpanRef<'a>, + recursive: Vec, + } + let entries = self + .children_par() + .map(|span| { + let name = span.group_name(); + let mut recursive = Vec::new(); + let mut queue = VecDeque::with_capacity(0); + for nested_child in span.children() { let nested_name = nested_child.group_name(); if name == nested_name { - recursive_list.push(nested_child.index()); + recursive.push(nested_child.index()); queue.push_back(nested_child); } } - } + while let Some(child) = queue.pop_front() { + for nested_child in child.children() { + let nested_name = nested_child.group_name(); + if name == nested_name { + recursive.push(nested_child.index()); + queue.push_back(nested_child); + } + } + } + Entry { span, recursive } + }) + .collect_vec_list(); + let mut map: FxIndexMap<&str, (Vec, Vec)> = + FxIndexMap::default(); + for Entry { + span, + mut recursive, + } in entries.into_iter().flatten() + { + let name = span.group_name(); + let (list, recursive_list) = map.entry(name).or_default(); + list.push(span.index()); + recursive_list.append(&mut recursive); } event_map_to_list(map) })