Skip to content

Commit

Permalink
Aggregate samples in collector and move symbolizer
Browse files Browse the repository at this point in the history
Before this commit, we were generating profiles, such as flamegraphs
based on a collection of profiles. While there's nothing inherently
wrong about this representation, we were never aggregating the stacks,
which would result in duplicated stack (a sample without a count)
entries. This would be fed directly to the profile renderers and could
potentially cause behaviours that aren't well defined. For example, only
one of the samples (sometimes the first one, in other implementations
the last one) would be used.

Additionally, the collector has no businessin symbolization so this is
now done elsewhere.

Test Plan
=========

Collected a couple of profiles, they all look good.
  • Loading branch information
javierhonduco committed Jun 19, 2024
1 parent 7139dc6 commit be8bbdb
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 100 deletions.
15 changes: 15 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate bindgen;
use std::env;
use std::path::PathBuf;

use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
use glob::glob;
use libbpf_cargo::SkeletonBuilder;
use std::path::Path;
Expand All @@ -15,6 +16,19 @@ const TRACERS_BPF_HEADER: &str = "./src/bpf/tracers.h";
const TRACERS_BPF_SOURCE: &str = "./src/bpf/tracers.bpf.c";
const TRACERS_SKELETON: &str = "./src/bpf/tracers_skel.rs";

#[derive(Debug)]
struct CustomParseCallbacks;

impl ParseCallbacks for CustomParseCallbacks {
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
if derive_info.name == "native_stack_t" {
vec!["Hash".into(), "Eq".into(), "PartialEq".into()]
} else {
vec![]
}
}
}

fn main() {
// Inform cargo of when to re build
for path in glob("src/bpf/*[hc]").unwrap().flatten() {
Expand All @@ -24,6 +38,7 @@ fn main() {
// Main native profiler.
let bindings = bindgen::Builder::default()
.derive_default(true)
.parse_callbacks(Box::new(CustomParseCallbacks))
.header(PROFILER_BPF_HEADER)
.generate()
.expect("Unable to generate bindings");
Expand Down
35 changes: 27 additions & 8 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::sync::{Arc, Mutex};
use tracing::{debug, span, Level};

use crate::object::ExecutableId;
use crate::profile::symbolize_profile;
use crate::profiler::ObjectFileInfo;
use crate::profiler::ProcessInfo;
use crate::profiler::RawAggregatedProfile;
use crate::profiler::SymbolizedAggregatedProfile;
use crate::profiler::{ObjectFileInfo, RawAggregatedSample};

pub struct Collector {
profiles: Vec<RawAggregatedProfile>,
Expand Down Expand Up @@ -53,14 +51,35 @@ impl Collector {
}
}

pub fn finish(&self) -> Vec<SymbolizedAggregatedProfile> {
pub fn finish(
&self,
) -> (
RawAggregatedProfile,
&HashMap<i32, ProcessInfo>,
&HashMap<ExecutableId, ObjectFileInfo>,
) {
let _span: span::EnteredSpan = span!(Level::DEBUG, "symbolize_profiles").entered();

debug!("Collector::finish {}", self.profiles.len());
let mut r = Vec::new();
let mut samples_count = HashMap::new();
for profile in &self.profiles {
r.push(symbolize_profile(profile, &self.procs, &self.objs));
for sample in profile {
let sample_without_count = RawAggregatedSample {
count: 0,
..*sample
};
*samples_count.entry(sample_without_count).or_insert(0) += sample.count
}
}
r

debug!("found {} unique samples", samples_count.len());
let profile = samples_count
.iter()
.map(|(sample, count)| RawAggregatedSample {
count: *count,
..*sample
})
.collect();

(profile, &self.procs, &self.objs)
}
}
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use tracing_subscriber::FmtSubscriber;

use lightswitch::collector::Collector;
use lightswitch::object::ObjectFile;
use lightswitch::profile::fold_profiles;
use lightswitch::profile::fold_profile;
use lightswitch::profile::symbolize_profile;
use lightswitch::profiler::Profiler;
use lightswitch::unwind_info::in_memory_unwind_info;
use lightswitch::unwind_info::remove_redundant;
Expand Down Expand Up @@ -213,11 +214,13 @@ fn main() -> Result<(), Box<dyn Error>> {
p.profile_pids(args.pids);
p.run(collector.clone());

let profiles = collector.lock().unwrap().finish();
let collector = collector.lock().unwrap();
let (raw_profile, procs, objs) = collector.finish();
let symbolized_profile = symbolize_profile(&raw_profile, procs, objs);

match args.profile_format {
ProfileFormat::FlameGraph => {
let folded = fold_profiles(profiles);
let folded = fold_profile(symbolized_profile);
let mut options: flamegraph::Options<'_> = flamegraph::Options::default();
let data = folded.as_bytes();
let f = File::create(args.profile_name).unwrap();
Expand Down
174 changes: 86 additions & 88 deletions src/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,110 +15,108 @@ use crate::profiler::SymbolizedAggregatedProfile;
use crate::profiler::SymbolizedAggregatedSample;
use crate::usym::symbolize_native_stack_blaze;

/// Converts a collection of symbolized aggregated profiles to their folded representation that most flamegraph renderers use.
/// Converts a symbolized aggregated profiles to their folded representation that most flamegraph renderers use.
/// Folded stacks look like this:
///
/// > base_frame;other_frame;top_frame 100
/// > another_base_frame;other_frame;top_frame 300
///
/// The frame names are separated by semicolons and the count is at the end separated with a space. We insert some synthetic
/// frames to quickly identify the thread and process names and other pieces of metadata.
pub fn fold_profiles(profiles: Vec<SymbolizedAggregatedProfile>) -> String {
pub fn fold_profile(profile: SymbolizedAggregatedProfile) -> String {
let mut folded = String::new();

for profile in profiles {
for sample in profile {
let ustack = sample
.ustack
.clone()
.into_iter()
.rev()
.map(|e| e.to_string())
.collect::<Vec<String>>();
let ustack = ustack.join(";");
let kstack = sample
.kstack
.clone()
.into_iter()
.rev()
.map(|e| format!("kernel: {}", e))
.collect::<Vec<String>>();
let kstack = kstack.join(";");
let count: String = sample.count.to_string();

// Getting the meatadata for the stack. This will be abstracted in the future in a common module.
let (process_name, thread_name) = match procfs::process::Process::new(sample.pid) {
// We successfully looked up the PID in procfs (we don't yet
// know if it's a PID/PGID/main thread or a TID/non-main thread)
Ok(p) => match p.stat() {
// Successfully got the pid/tid stat info
Ok(stat) => {
// Differentiate between PID/PGID/main thread or TID/non-main thread
if stat.pid == stat.pgrp {
// NOTE:
// This is the main thread for the PID/PGID
// If stat.pid() == stat.pgrp() for this process,
// this is a stack for the main thread
// of the pid, and stat.comm is the name of the
// process binary file, so use:
// process_name = stat.comm, and thread_name = "main_thread"
(stat.comm, "main_thread".to_string())
} else {
// NOTE:
// This is a non-main thread (TID) of a PID, so we
// have to look up the actual PID/PGID to get the
// process binary name
// As in, stat.comm is the name of the thread, and
// you have to look up the process binary name, so
// use:
// process_name = <derive from stat.pgrp>, and thread_name = stat.comm
//
let process_name = match procfs::process::Process::new(stat.pgrp) {
// We successfully looked up the PID/PGID of the TID in procfs
Ok(p) => match p.stat() {
// We successfully looked up the PID binary name from stat
Ok(stat2) => stat2.comm,
// We were unable to get the PID's binary name from stat
Err(_) => "<could not fetch process name>".to_string(),
},
// We failed to look up the PID/PGID of the TID in procfs
for sample in profile {
let ustack = sample
.ustack
.clone()
.into_iter()
.rev()
.map(|e| e.to_string())
.collect::<Vec<String>>();
let ustack = ustack.join(";");
let kstack = sample
.kstack
.clone()
.into_iter()
.rev()
.map(|e| format!("kernel: {}", e))
.collect::<Vec<String>>();
let kstack = kstack.join(";");
let count: String = sample.count.to_string();

// Getting the meatadata for the stack. This will be abstracted in the future in a common module.
let (process_name, thread_name) = match procfs::process::Process::new(sample.pid) {
// We successfully looked up the PID in procfs (we don't yet
// know if it's a PID/PGID/main thread or a TID/non-main thread)
Ok(p) => match p.stat() {
// Successfully got the pid/tid stat info
Ok(stat) => {
// Differentiate between PID/PGID/main thread or TID/non-main thread
if stat.pid == stat.pgrp {
// NOTE:
// This is the main thread for the PID/PGID
// If stat.pid() == stat.pgrp() for this process,
// this is a stack for the main thread
// of the pid, and stat.comm is the name of the
// process binary file, so use:
// process_name = stat.comm, and thread_name = "main_thread"
(stat.comm, "main_thread".to_string())
} else {
// NOTE:
// This is a non-main thread (TID) of a PID, so we
// have to look up the actual PID/PGID to get the
// process binary name
// As in, stat.comm is the name of the thread, and
// you have to look up the process binary name, so
// use:
// process_name = <derive from stat.pgrp>, and thread_name = stat.comm
//
let process_name = match procfs::process::Process::new(stat.pgrp) {
// We successfully looked up the PID/PGID of the TID in procfs
Ok(p) => match p.stat() {
// We successfully looked up the PID binary name from stat
Ok(stat2) => stat2.comm,
// We were unable to get the PID's binary name from stat
Err(_) => "<could not fetch process name>".to_string(),
};
(process_name, stat.comm)
}
},
// We failed to look up the PID/PGID of the TID in procfs
Err(_) => "<could not fetch process name>".to_string(),
};
(process_name, stat.comm)
}
// Was unable to lookup the PID binary or thread name from stat
Err(_) => (
"<could not fetch process name>".to_string(),
"<could not fetch thread name>".to_string(),
),
},
// Completely failed to look up the PID/TID in procfs
}
// Was unable to lookup the PID binary or thread name from stat
Err(_) => (
"<could not fetch process name>".to_string(),
"<could not fetch thread name>".to_string(),
),
};
},
// Completely failed to look up the PID/TID in procfs
Err(_) => (
"<could not fetch process name>".to_string(),
"<could not fetch thread name>".to_string(),
),
};

writeln!(
folded,
"{};{}{}{} {}",
process_name,
thread_name,
if ustack.trim().is_empty() {
"".to_string()
} else {
format!(";{}", ustack)
},
if kstack.trim().is_empty() {
"".to_string()
} else {
format!(";{}", kstack)
},
count
)
.unwrap();
}
writeln!(
folded,
"{};{}{}{} {}",
process_name,
thread_name,
if ustack.trim().is_empty() {
"".to_string()
} else {
format!(";{}", ustack)
},
if kstack.trim().is_empty() {
"".to_string()
} else {
format!(";{}", kstack)
},
count
)
.unwrap();
}

folded
Expand Down
2 changes: 1 addition & 1 deletion src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const MAX_CHUNKS: usize = MAX_UNWIND_TABLE_CHUNKS as usize;
// TODO: should make this configurable via a command line argument in future
const PERF_BUFFER_BYTES: usize = 512 * 1024;

#[derive(Debug)]
#[derive(Debug, Hash, Eq, PartialEq)]
pub struct RawAggregatedSample {
pub pid: i32,
pub tid: i32,
Expand Down

0 comments on commit be8bbdb

Please sign in to comment.