Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] improve performance of the trace server #71661

Merged
merged 5 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ impl TraceFormat for HeaptrackFormat {
)
}

fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result<usize> {
type Reused = ();

fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result<usize> {
let mut bytes_read = 0;
let mut outdated_spans = HashSet::new();
let mut store = self.store.write();
Expand Down
117 changes: 93 additions & 24 deletions turbopack/crates/turbopack-trace-server/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ mod nextjs;
mod turbopack;

use std::{
any::Any,
env,
fs::File,
io::{self, BufReader, Read, Seek, SeekFrom},
path::PathBuf,
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -23,12 +24,55 @@ use crate::{
const MIN_INITIAL_REPORT_SIZE: u64 = 100 * 1024 * 1024;

trait TraceFormat {
fn read(&mut self, buffer: &[u8]) -> Result<usize>;
type Reused: Default;
fn read(&mut self, buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize>;
fn stats(&self) -> String {
String::new()
}
}

type ErasedReused = Box<dyn Any>;

struct ErasedTraceFormat(Box<dyn ObjectSafeTraceFormat>);

trait ObjectSafeTraceFormat {
fn create_reused(&self) -> ErasedReused;
fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize>;
fn stats(&self) -> String;
}

impl<T: TraceFormat> ObjectSafeTraceFormat for T
where
T::Reused: 'static,
{
fn create_reused(&self) -> ErasedReused {
Box::new(T::Reused::default())
}

fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
let reuse = reuse.downcast_mut().expect("Type of reuse is invalid");
TraceFormat::read(self, buffer, reuse)
}

fn stats(&self) -> String {
TraceFormat::stats(self)
}
}

impl ObjectSafeTraceFormat for ErasedTraceFormat {
fn create_reused(&self) -> ErasedReused {
self.0.create_reused()
}

fn read(&mut self, buffer: &[u8], reuse: &mut ErasedReused) -> Result<usize> {
self.0.read(buffer, reuse)
}

fn stats(&self) -> String {
self.0.stats()
}
}

#[derive(Default)]
enum TraceFile {
Raw(File),
Expand Down Expand Up @@ -128,10 +172,13 @@ impl TraceReader {
store.reset();
}

let mut format: Option<Box<dyn TraceFormat>> = None;
let mut format: Option<(ErasedTraceFormat, ErasedReused)> = None;

let mut current_read = 0;
let mut initial_read = { file.seek(SeekFrom::End(0)).ok() };
let mut initial_read = file
.seek(SeekFrom::End(0))
.ok()
.map(|total| (total, Instant::now()));
if file.seek(SeekFrom::Start(0)).is_err() {
return false;
}
Expand All @@ -151,9 +198,11 @@ impl TraceReader {
match file.read(&mut chunk) {
Ok(bytes_read) => {
if bytes_read == 0 {
if let Some(value) =
self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref())
{
if let Some(value) = self.wait_for_more_data(
&mut file,
&mut initial_read,
format.as_ref().map(|(f, _)| f),
) {
return value;
}
} else {
Expand All @@ -165,21 +214,29 @@ impl TraceReader {
}
buffer.extend_from_slice(&chunk[..bytes_read]);
if format.is_none() && buffer.len() >= 8 {
if buffer.starts_with(b"TRACEv0") {
let erased_format = if buffer.starts_with(b"TRACEv0") {
index = 7;
format = Some(Box::new(TurbopackFormat::new(self.store.clone())));
ErasedTraceFormat(Box::new(TurbopackFormat::new(
self.store.clone(),
)))
} else if buffer.starts_with(b"[{\"name\"") {
format = Some(Box::new(NextJsFormat::new(self.store.clone())));
ErasedTraceFormat(Box::new(NextJsFormat::new(self.store.clone())))
} else if buffer.starts_with(b"v ") {
format = Some(Box::new(HeaptrackFormat::new(self.store.clone())))
ErasedTraceFormat(Box::new(HeaptrackFormat::new(
self.store.clone(),
)))
} else {
// Fallback to the format without magic bytes
// TODO Remove this after a while and show an error instead
format = Some(Box::new(TurbopackFormat::new(self.store.clone())));
}
ErasedTraceFormat(Box::new(TurbopackFormat::new(
self.store.clone(),
)))
};
let reuse = erased_format.create_reused();
format = Some((erased_format, reuse));
}
if let Some(format) = &mut format {
match format.read(&buffer[index..]) {
if let Some((format, reuse)) = &mut format {
match format.read(&buffer[index..], reuse) {
Ok(bytes_read) => {
index += bytes_read;
}
Expand All @@ -193,7 +250,7 @@ impl TraceReader {
}
let prev_read = current_read;
current_read += bytes_read as u64;
if let Some(total) = &mut initial_read {
if let Some((total, start)) = &mut initial_read {
let old_mbs = prev_read / (97 * 1024 * 1024);
let new_mbs = current_read / (97 * 1024 * 1024);
if old_mbs != new_mbs {
Expand All @@ -207,7 +264,13 @@ impl TraceReader {
let uncompressed = current_read / (1024 * 1024);
let total = *total / (1024 * 1024);
let stats = format.stats();
print!("{}% read ({}/{} MB)", percentage, read, total);
print!(
"{}% read ({}/{} MB, {} MB/s)",
percentage,
read,
total,
read * 1000 / (start.elapsed().as_millis() + 1) as u64
);
if uncompressed != read {
print!(" ({} MB uncompressed)", uncompressed);
}
Expand All @@ -231,9 +294,11 @@ impl TraceReader {
}
Err(err) => {
if err.kind() == io::ErrorKind::UnexpectedEof {
if let Some(value) =
self.wait_for_more_data(&mut file, &mut initial_read, format.as_deref())
{
if let Some(value) = self.wait_for_more_data(
&mut file,
&mut initial_read,
format.as_ref().map(|(f, _)| f),
) {
return value;
}
} else {
Expand All @@ -249,19 +314,23 @@ impl TraceReader {
fn wait_for_more_data(
&mut self,
file: &mut TraceFile,
initial_read: &mut Option<u64>,
format: Option<&dyn TraceFormat>,
initial_read: &mut Option<(u64, Instant)>,
format: Option<&ErasedTraceFormat>,
) -> Option<bool> {
let Ok(pos) = file.stream_position() else {
return Some(true);
};
if let Some(total) = initial_read.take() {
if let Some((total, start)) = initial_read.take() {
if let Some(format) = format {
let stats = format.stats();
println!("{}", stats);
}
if total > MIN_INITIAL_REPORT_SIZE {
println!("Initial read completed ({} MB)", total / (1024 * 1024));
println!(
"Initial read completed ({} MB, {}s)",
total / (1024 * 1024),
(start.elapsed().as_millis() / 100) as f32 / 10.0
);
Comment on lines +329 to +333
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://doc.rust-lang.org/std/fmt/#precision

Suggested change
println!(
"Initial read completed ({} MB, {}s)",
total / (1024 * 1024),
(start.elapsed().as_millis() / 100) as f32 / 10.0
);
println!(
"Initial read completed ({} MB, {:.1}s)",
total / (1024 * 1024),
start.elapsed().as_millis() as f32 / 1000.0
);

}
}
loop {
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbopack-trace-server/src/reader/nextjs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ impl NextJsFormat {
}

impl TraceFormat for NextJsFormat {
fn read(&mut self, mut buffer: &[u8]) -> anyhow::Result<usize> {
type Reused = ();

fn read(&mut self, mut buffer: &[u8], _reuse: &mut Self::Reused) -> anyhow::Result<usize> {
let mut bytes_read = 0;
let mut outdated_spans = HashSet::new();
loop {
Expand Down
37 changes: 34 additions & 3 deletions turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
mem::transmute,
ops::{Deref, DerefMut},
sync::Arc,
};

Expand Down Expand Up @@ -294,8 +296,15 @@ impl TurbopackFormat {
}

impl TraceFormat for TurbopackFormat {
fn read(&mut self, mut buffer: &[u8]) -> Result<usize> {
let mut rows = Vec::new();
type Reused = Vec<TraceRow<'static>>;

fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
reuse.clear();
let mut reuse = ClearOnDrop(reuse);
// Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the
// lifetime of data, since there is no data and data can't leave this function.
let rows =
unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
let mut bytes_read = 0;
loop {
match postcard::take_from_bytes(buffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole "write rows to a vec" thing makes sense if one of the following is true:

  1. You desperately need to reduce the time you hold the store write lock open and can't wait for de-serialization to happen.
  2. You don't want partial writes to the store to happen in the case of a deserialization failure.

Number 1 seems like it shouldn't matter because deserialization should be fast. Number 2 might matter, but we can probably assume that deserialization doesn't fail and panic if it does?

If one of those two isn't true, you could call self.process inside the deserialization loop, and remove the need for a vec entirely.

Expand All @@ -314,7 +323,7 @@ impl TraceFormat for TurbopackFormat {
}
if !rows.is_empty() {
let store = self.store.clone();
let mut iter = rows.into_iter();
let mut iter = rows.drain(..);
{
let mut store = store.write();
for row in iter.by_ref() {
Expand All @@ -327,3 +336,25 @@ impl TraceFormat for TurbopackFormat {
Ok(bytes_read)
}
}

struct ClearOnDrop<'l, T>(&'l mut Vec<T>);

impl<T> Drop for ClearOnDrop<'_, T> {
fn drop(&mut self) {
self.0.clear();
}
}

impl<T> Deref for ClearOnDrop<'_, T> {
type Target = Vec<T>;

fn deref(&self) -> &Self::Target {
self.0
}
}

impl<T> DerefMut for ClearOnDrop<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}
Loading
Loading