Skip to content

Commit

Permalink
mooore
Browse files Browse the repository at this point in the history
  • Loading branch information
javierhonduco committed Jul 23, 2024
1 parent 221e361 commit 8b79792
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 463 deletions.
712 changes: 289 additions & 423 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[workspace]
members = [
"proto",
"lightswitch-proto",
]

[dependencies]
Expand Down Expand Up @@ -34,7 +34,7 @@ primal = "0.3.3"
nix = { version = "0.29.0", features = ["user"] }
prost = "0.12"
reqwest = { version = "0.12", features = ["blocking"] }
proto = { path = "./proto"}
lightswitch-proto = { path = "./lightswitch-proto"}

[dev-dependencies]
assert_cmd = { version = "2.0.14" }
Expand Down
3 changes: 2 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
elfutils' = (pkgs.elfutils.override { enableDebuginfod = false; }).overrideAttrs (attrs: {
configureFlags = attrs.configureFlags ++ [ "--without-zstd" ];
});
openssl' = (pkgs.openssl.override { static = true; });
buildInputs = with pkgs; [
llvmPackages_16.clang
llvmPackages_16.libcxx
Expand All @@ -37,7 +38,7 @@
glibc
glibc.static
protobuf
openssl
openssl'
];
nativeBuildInputs = with pkgs; [
pkg-config
Expand Down
4 changes: 2 additions & 2 deletions proto/Cargo.toml → lightswitch-proto/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "proto"
name = "lightswitch-proto"
version = "0.1.0"
edition = "2021"

Expand All @@ -8,4 +8,4 @@ prost = "0.12"
anyhow = "1.0.86"

[build-dependencies]
prost-build = { version = "*" }
prost-build = "0.13.1"
File renamed without changes.
File renamed without changes.
29 changes: 19 additions & 10 deletions proto/src/profile.rs → lightswitch-proto/src/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ pub mod pprof {
include!(concat!(env!("OUT_DIR"), "/perftools.profiles.rs"));
}

use anyhow::{anyhow, Result};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, Result};

pub struct PprofBuilder {
duration: Duration,
freq_in_hz: i64,

known_mappings: HashMap<u64, u64>,
Expand All @@ -33,8 +36,9 @@ pub enum LabelStringOrNumber {
}

impl PprofBuilder {
pub fn with_frequency(freq_in_hz: i64) -> Self {
pub fn new(duration: Duration, freq_in_hz: i64) -> Self {
Self {
duration,
freq_in_hz,

known_mappings: HashMap::new(),
Expand Down Expand Up @@ -292,17 +296,22 @@ impl PprofBuilder {
// which should not be interpreted like this by other profiles.
let comments = vec![self.get_or_insert_string("lightswitch")];

let duration_since_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos(); // u128

pprof::Profile {
sample_type: vec![sample_type, period_type.clone()],
sample_type: vec![sample_type, period_type],
sample: self.samples,
mapping: self.mappings, // from the source `mapping\[0\] will be the main binary.` not sure if that makes sense to me...
location: self.locations,
function: self.functions,
string_table: self.string_table,
drop_frames: 0,
keep_frames: 0,
time_nanos: 0,
duration_nanos: 0,
time_nanos: timestamp_nanos as i64,
duration_nanos: self.duration.as_nanos() as i64,
period_type: Some(period_type),
period: 1_000_000_000 / self.freq_in_hz,
comment: comments,
Expand Down Expand Up @@ -335,7 +344,7 @@ mod tests {

#[test]
fn test_string_table() {
let mut pprof = PprofBuilder::with_freqency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
assert_eq!(pprof.get_or_insert_string("hi"), 1);
assert_eq!(pprof.get_or_insert_string("salut"), 2);
assert_eq!(pprof.string_table, vec!["", "hi", "salut"]);
Expand All @@ -348,7 +357,7 @@ mod tests {

#[test]
fn test_mappings() {
let mut pprof = PprofBuilder::with_freqency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
assert_eq!(
pprof.add_mapping(0, 0x100, 0x200, 0x0, "file.so", "sha256-abc"),
1
Expand All @@ -366,7 +375,7 @@ mod tests {

#[test]
fn test_locations() {
let mut pprof = PprofBuilder::with_freqency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let _ = pprof.add_line("hahahaha-first-line");
let (line, function_id) = pprof.add_line("test-line");

Expand All @@ -393,7 +402,7 @@ mod tests {

#[test]
fn test_sample() {
let mut pprof = PprofBuilder::with_freqency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let labels = vec![
pprof.new_label("key", LabelStringOrNumber::String("value".into())),
pprof.new_label("key", LabelStringOrNumber::Number(123, "pid".into())),
Expand Down Expand Up @@ -429,7 +438,7 @@ mod tests {

let mut rng = rand::thread_rng();

let mut pprof = PprofBuilder::with_freqency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
// Let's say we have this profile
let raw_samples = vec![
(vec![123], 200),
Expand Down
File renamed without changes.
40 changes: 25 additions & 15 deletions src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use prost::Message;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, span, Level};

use crate::object::ExecutableId;
Expand Down Expand Up @@ -39,6 +40,7 @@ impl NullCollector {
}
}

/// Discards the profile, useful for testing.
impl Collector for NullCollector {
fn collect(
&mut self,
Expand All @@ -61,33 +63,44 @@ impl Collector for NullCollector {

#[derive(Default)]
pub struct StreamingCollector {
pprof_ingest_url: String,
timeout: Duration,
procs: HashMap<i32, ProcessInfo>,
objs: HashMap<ExecutableId, ObjectFileInfo>,
}

impl StreamingCollector {
pub fn new() -> Self {
Self::default()
pub fn new(pprof_ingest_url: &str) -> Self {
Self {
pprof_ingest_url: pprof_ingest_url.into(),
timeout: Duration::from_secs(30),
..Default::default()
}
}
}

/// POSTs the pprof formatted profiles to the given url.
impl Collector for StreamingCollector {
fn collect(
&mut self,
profile: RawAggregatedProfile,
procs: &HashMap<i32, ProcessInfo>,
objs: &HashMap<ExecutableId, ObjectFileInfo>,
) {
let symbolized_profile = symbolize_profile(&profile, procs, objs);
let pprof = to_proto(symbolized_profile, procs, objs);
let pprof_profile = pprof.profile();
let _span = span!(Level::DEBUG, "StreamingCollector.finish").entered();

let client = reqwest::blocking::Client::new();
let resp = client
.post("http://localhost:4567/pprof/new")
.body(pprof_profile.encode_to_vec())
let symbolized_profile = symbolize_profile(&profile, procs, objs);
let pprof_builder = to_proto(symbolized_profile, procs, objs);
let pprof = pprof_builder.profile();

let client_builder = reqwest::blocking::Client::builder().timeout(self.timeout);
let client = client_builder.build().unwrap();
let response = client
.post(self.pprof_ingest_url.clone())
.body(pprof.encode_to_vec())
.send();
tracing::info!("http request: {:?}", resp);

tracing::debug!("http response: {:?}", response);
}

fn finish(
Expand All @@ -97,7 +110,6 @@ impl Collector for StreamingCollector {
&HashMap<i32, ProcessInfo>,
&HashMap<ExecutableId, ObjectFileInfo>,
) {
// no op, maybe change return type?
(RawAggregatedProfile::new(), &self.procs, &self.objs)
}
}
Expand All @@ -115,9 +127,7 @@ impl AggregatorCollector {
}
}

/// This collector products a symbolized profile when finish is called. It will append the latests
/// processes and objects generating quite a bit of memory bloat. This is however acceptable if
/// profiling for short amounts of time.
/// Aggregagates the samples in memory, which might be acceptable when profiling for short amounts of time.
impl Collector for AggregatorCollector {
fn collect(
&mut self,
Expand Down Expand Up @@ -153,7 +163,7 @@ impl Collector for AggregatorCollector {
&HashMap<i32, ProcessInfo>,
&HashMap<ExecutableId, ObjectFileInfo>,
) {
let _span: span::EnteredSpan = span!(Level::DEBUG, "symbolize_profiles").entered();
let _span = span!(Level::DEBUG, "AggregatorCollector.finish").entered();

let mut samples_count = HashMap::new();
for profile in &self.profiles {
Expand Down
13 changes: 10 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use lightswitch::unwind_info::remove_unnecesary_markers;
use lightswitch::unwind_info::UnwindInfoBuilder;

const SAMPLE_FREQ_RANGE: RangeInclusive<usize> = 1..=1009;
const PPROF_INGEST_URL: &str = "http://localhost:4567/pprof/new";

fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Expand Down Expand Up @@ -91,6 +92,7 @@ enum ProfileFormat {
#[default]
FlameGraph,
Pprof,
// None //?
}

#[derive(PartialEq, clap::ValueEnum, Debug, Clone, Default)]
Expand Down Expand Up @@ -225,7 +227,9 @@ fn main() -> Result<(), Box<dyn Error>> {
ProfileSender::LocalDisk => {
Box::new(AggregatorCollector::new()) as Box<dyn Collector + Send>
}
ProfileSender::Remote => Box::new(StreamingCollector::new()) as Box<dyn Collector + Send>,
ProfileSender::Remote => {
Box::new(StreamingCollector::new(PPROF_INGEST_URL)) as Box<dyn Collector + Send>
}
}));

let mut p: Profiler<'_> = Profiler::new(
Expand All @@ -241,8 +245,11 @@ fn main() -> Result<(), Box<dyn Error>> {
let (raw_profile, procs, objs) = collector.finish();

// If we need to send the profile to the backend there's nothing else to do.
if args.sender == ProfileSender::Remote {
return Ok(());
match args.sender {
ProfileSender::Remote | ProfileSender::None => {
return Ok(());
}
_ => {}
}

// Otherwise let's symbolize the profile and write it to disk.
Expand Down
14 changes: 7 additions & 7 deletions src/profile.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use proto::profile::LabelStringOrNumber;
use proto::profile::PprofBuilder;
use lightswitch_proto::profile::LabelStringOrNumber;
use lightswitch_proto::profile::PprofBuilder;
use std::collections::HashMap;
use std::fmt::Write;
use std::path::PathBuf;
use std::time::Duration;
use tracing::{debug, error, span, Level};

use crate::bpf::profiler_bindings::native_stack_t;
Expand All @@ -17,12 +18,13 @@ use crate::profiler::SymbolizedAggregatedProfile;
use crate::profiler::SymbolizedAggregatedSample;
use crate::usym::symbolize_native_stack_blaze;

/// Converts a given symbolized profile to Google's pprof.
pub fn to_proto(
profile: SymbolizedAggregatedProfile,
procs: &HashMap<i32, ProcessInfo>,
objs: &HashMap<ExecutableId, ObjectFileInfo>,
) -> PprofBuilder {
let mut pprof = PprofBuilder::with_frequency(27);
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);

for sample in profile {
let pid = sample.pid;
Expand All @@ -42,8 +44,7 @@ pub fn to_proto(
);

let (line, _) = pprof.add_line(&frame.name);
let location =
pprof.add_location(frame.virtual_address, mapping_id, vec![line.clone()]);
let location = pprof.add_location(frame.virtual_address, mapping_id, vec![line]);
location_ids.push(location);
}

Expand Down Expand Up @@ -87,8 +88,7 @@ pub fn to_proto(
}

let (line, _) = pprof.add_line(&frame.name);
let location =
pprof.add_location(normalized_addr, mapping_id, vec![line.clone()]);
let location = pprof.add_location(normalized_addr, mapping_id, vec![line]);
location_ids.push(location);
}
None => {
Expand Down

0 comments on commit 8b79792

Please sign in to comment.