Skip to content

Commit

Permalink
[Turbopack] improve performance of the trace server (#71661)
Browse files Browse the repository at this point in the history
### What?

* add load speed and total time
* reuse buffer while loading
* performance with parallelisation
* allow to disable corrected time for performance reasons
* parallelize graph creation
  • Loading branch information
sokra authored Nov 8, 2024
1 parent 302e0fe commit 4c346f5
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ impl TraceFormat for HeaptrackFormat {
)
}

fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result<usize> {
type Reused = ();

fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result<usize> {
let mut bytes_read = 0;
let mut outdated_spans = HashSet::new();
let mut store = self.store.write();
Expand Down
117 changes: 93 additions & 24 deletions turbopack/crates/turbopack-trace-server/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ mod nextjs;
mod turbopack;

use std::{
any::Any,
env,
fs::File,
io::{self, BufReader, Read, Seek, SeekFrom},
path::PathBuf,
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -23,12 +24,55 @@ use crate::{
const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;

trait TraceFormat {
fn read(&mut self, buffer: &[u8]) -> Result<usize>;
type Reused: Default;
fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
fn stats(&self) -> String {
String::new()
}
}

type ErasedReused = Box<dyn Any>;

struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);

trait ObjectSafeTraceFormat {
fn create_reused(&self) -> ErasedReused;
fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
fn stats(&self) -> String;
}

impl<T: TraceFormat> ObjectSafeTraceFormat for T
where
T::Reused: 'static,
{
fn create_reused(&self) -> ErasedReused {
Box::new(T::Reused::default())
}

fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
TraceFormat::read(self, buffer, reuse)
}

fn stats(&self) -> String {
TraceFormat::stats(self)
}
}

impl ObjectSafeTraceFormat for ErasedTraceFormat {
fn create_reused(&self) -> ErasedReused {
self.0.create_reused()
}

fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
self.0.read(buffer, reuse)
}

fn stats(&self) -> String {
self.0.stats()
}
}

#[derive(Default)]
enum TraceFile {
Raw(File),
Expand Down Expand Up @@ -128,10 +172,13 @@ impl TraceReader {
store.reset();
}

let mut format: Option<Box<dyn TraceFormat>> = None;
let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;

let mut current_read = 0;
let mut initial_read = { file.seek(SeekFrom::End(0)).ok() };
let mut initial_read = file
.seek(SeekFrom::End(0))
.ok()
.map(|total| (total, Instant::now()));
if file.seek(SeekFrom::Start(0)).is_err() {
return false;
}
Expand All @@ -151,9 +198,11 @@ impl TraceReader {
match file.read(&mut chunk) {
Ok(bytes_read) => {
if bytes_read == 0 {
if let Some(value) =
self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref())
{
if let Some(value) = self.wait_for_more_data(
&mut file,
&mut initial_read,
format.as_ref().map(|(f, _)| f),
) {
return value;
}
} else {
Expand All @@ -165,21 +214,29 @@ impl TraceReader {
}
buffer.extend_from_slice(&chunk[..bytes_read]);
if format.is_none() && buffer.len() >= 8 {
if buffer.starts_with(b"TRACEv0") {
let erased_format = if buffer.starts_with(b"TRACEv0") {
index = 7;
format = Some(Box::new(TurbopackFormat::new(self.store.clone())));
ErasedTraceFormat(Box::new(TurbopackFormat::new(
self.store.clone(),
)))
} else if buffer.starts_with(b"[{\"name\"") {
format = Some(Box::new(NextJsFormat::new(self.store.clone())));
ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
} else if buffer.starts_with(b"v ") {
format = Some(Box::new(HeaptrackFormat::new(self.store.clone())))
ErasedTraceFormat(Box::new(HeaptrackFormat::new(
self.store.clone(),
)))
} else {
// Fallback to the format without magic bytes
// TODO Remove this after a while and show an error instead
format = Some(Box::new(TurbopackFormat::new(self.store.clone())));
}
ErasedTraceFormat(Box::new(TurbopackFormat::new(
self.store.clone(),
)))
};
let reuse = erased_format.create_reused();
format = Some((erased_format, reuse));
}
if let Some(format) = &mut format {
match format.read(&buffer[index..]) {
if let Some((format, reuse)) = &mut format {
match format.read(&buffer[index..], reuse) {
Ok(bytes_read) => {
index += bytes_read;
}
Expand All @@ -193,7 +250,7 @@ impl TraceReader {
}
let prev_read = current_read;
current_read += bytes_read as u64;
if let Some(total) = &mut initial_read {
if let Some((total, start)) = &mut initial_read {
let old_mbs = prev_read / (97 * 1024 * 1024);
let new_mbs = current_read / (97 * 1024 * 1024);
if old_mbs != new_mbs {
Expand All @@ -207,7 +264,13 @@ impl TraceReader {
let uncompressed = current_read / (1024 * 1024);
let total = *total / (1024 * 1024);
let stats = format.stats();
print!("{}% read ({}/{} MB)", percentage, read, total);
print!(
"{}% read ({}/{} MB, {} MB/s)",
percentage,
read,
total,
read * 1000 / (start.elapsed().as_millis() + 1) as u64
);
if uncompressed != read {
print!(" ({} MB uncompressed)", uncompressed);
}
Expand All @@ -231,9 +294,11 @@ impl TraceReader {
}
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
if let Some(value) =
self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref())
{
if let Some(value) = self.wait_for_more_data(
&mut file,
&mut initial_read,
format.as_ref().map(|(f, _)| f),
) {
return value;
}
} else {
Expand All @@ -249,19 +314,23 @@ impl TraceReader {
fn wait_for_more_data(
&mut self,
file: &mut TraceFile,
initial_read: &mut Option<u64>,
format: Option<&dyn TraceFormat>,
initial_read: &mut Option<(u64, Instant)>,
format: Option<&ErasedTraceFormat>,
) -> Option<bool> {
let Ok(pos) = file.stream_position() else {
return Some(true);
};
if let Some(total) = initial_read.take() {
if let Some((total, start)) = initial_read.take() {
if let Some(format) = format {
let stats = format.stats();
println!("{}", stats);
}
if total > MIN_INITIAL_REPORT_SIZE {
println!("Initial read completed ({} MB)", total / (1024 * 1024));
println!(
"Initial read completed ({} MB, {}s)",
total / (1024 * 1024),
(start.elapsed().as_millis() / 100) as f32 / 10.0
);
}
}
loop {
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ impl NextJsFormat {
}

impl TraceFormat for NextJsFormat {
fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result<usize> {
type Reused = ();

fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result<usize> {
let mut bytes_read = 0;
let mut outdated_spans = HashSet::new();
loop {
Expand Down
37 changes: 34 additions & 3 deletions turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
mem::transmute,
ops::{Deref, DerefMut},
sync::Arc,
};

Expand Down Expand Up @@ -294,8 +296,15 @@ impl TurbopackFormat {
}

impl TraceFormat for TurbopackFormat {
fn read(&mut self, mut buffer: &[u8]) -> Result<usize> {
let mut rows = Vec::new();
type Reused = Vec<TraceRow<'static>>;

fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
reuse.clear();
let mut reuse = ClearOnDrop(reuse);
// Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the
// lifetime of data, since there is no data and data can't leave this function.
let rows =
unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
let mut bytes_read = 0;
loop {
match postcard::take_from_bytes(buffer) {
Expand All @@ -314,7 +323,7 @@ impl TraceFormat for TurbopackFormat {
}
if !rows.is_empty() {
let store = self.store.clone();
let mut iter = rows.into_iter();
let mut iter = rows.drain(..);
{
let mut store = store.write();
for row in iter.by_ref() {
Expand All @@ -327,3 +336,25 @@ impl TraceFormat for TurbopackFormat {
Ok(bytes_read)
}
}

struct ClearOnDrop<'l, T>(&'l mut Vec<T>);

impl<T> Drop for ClearOnDrop<'_, T> {
fn drop(&mut self) {
self.0.clear();
}
}

impl<T> Deref for ClearOnDrop<'_, T> {
type Target = Vec<T>;

fn deref(&self) -> &Self::Target {
self.0
}
}

impl<T> DerefMut for ClearOnDrop<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}
Loading

0 comments on commit 4c346f5

Please sign in to comment.