diff --git a/src/collector.rs b/src/collector.rs index 4a232fb..412bcc1 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -127,7 +127,7 @@ impl AggregatorCollector { } } -/// Aggregagates the samples in memory, which might be acceptable when profiling for short amounts of time. +/// Aggregates the samples in memory, which might be acceptable when profiling for short amounts of time. impl Collector for AggregatorCollector { fn collect( &mut self, diff --git a/src/main.rs b/src/main.rs index ca408f3..e44e06e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use core::str; use std::error::Error; use std::fs::File; use std::io::IsTerminal; @@ -22,7 +23,7 @@ use tracing_subscriber::FmtSubscriber; use lightswitch::object::ObjectFile; use lightswitch::profile::symbolize_profile; use lightswitch::profile::{fold_profile, to_pprof}; -use lightswitch::profiler::Profiler; +use lightswitch::profiler::{Profiler, ProfilerConfig}; use lightswitch::unwind_info::in_memory_unwind_info; use lightswitch::unwind_info::remove_redundant; use lightswitch::unwind_info::remove_unnecesary_markers; @@ -62,6 +63,27 @@ fn sample_freq_in_range(s: &str) -> Result { Ok(sample_freq as u16) } +// Clap value_parser() in the form of: Fn(&str) -> Result +// Convert a &str into a usize, if possible, and return the result if it's a +// power of 2, otherwise Error +fn value_is_power_of_two(s: &str) -> Result { + let value: usize = s + .parse() + .map_err(|_| format!("`{s}' isn't a valid usize"))?; + // Now we have a value, test whether it's a power of 2 + if is_power_of_two(value) { + Ok(value) + } else { + Err(format!("{} is not a power of 2", value)) + } +} + +fn is_power_of_two(v: usize) -> bool { + // NOTE: Neither 0 nor 1 are a power of 2 (ignoring 2^0 for this use case), + // so rule them out + (v != 0) && (v != 1) && ((v & (v - 1)) == 0) +} + /// Given a non-prime unsigned int, return the prime number that precedes it /// as well as the prime that succeeds it fn primes_before_after(non_prime: usize) -> Result<(usize, usize), String> { @@ -153,6 +175,51 @@ struct Cli { /// Where to write the profile. #[arg(long, default_value_t, value_enum)] sender: ProfileSender, + // Buffer Sizes with defaults + #[arg(long, default_value_t = 512 * 1024, value_name = "PERF_BUFFER_BYTES", + help="Size of each profiler perf buffer, in bytes (must be a power of 2)", + value_parser = value_is_power_of_two)] + perf_buffer_bytes: usize, + // Print out info on eBPF map sizes + #[arg(long, help = "Print eBPF map sizes after creation")] + mapsize_info: bool, + // eBPF map stacks + #[arg( + long, + default_value_t = 100000, + help = "max number of individual \ + stacks to capture before aggregation" + )] + mapsize_stacks: u32, + // eBPF map aggregated_stacks + #[arg( + long, + default_value_t = 10000, + help = "Derived from constant MAX_AGGREGATED_STACKS_ENTRIES - max \ + number of unique stacks after aggregation" + )] + mapsize_aggregated_stacks: u32, + // eBPF map unwind_info_chunks + #[arg( + long, + default_value_t = 5000, + help = "max number of chunks allowed inside a shard" + )] + mapsize_unwind_info_chunks: u32, + // eBPF map unwind_tables + #[arg( + long, + default_value_t = 65, + help = "Derived from constant MAX_UNWIND_INFO_SHARDS" + )] + mapsize_unwind_tables: u32, + // eBPF map rate_limits + #[arg( + long, + default_value_t = 5000, + help = "Derived from constant MAX_PROCESSES" + )] + mapsize_rate_limits: u32, } /// Exit the main thread if any thread panics. We prefer this behaviour because pretty much every @@ -228,6 +295,21 @@ fn main() -> Result<(), Box> { } })); + let profiler_config = ProfilerConfig { + // NOTE the difference in this arg name from the actual config name + libbpf_debug: args.libbpf_logs, + bpf_logging: args.bpf_logging, + duration: args.duration, + sample_freq: args.sample_freq, + perf_buffer_bytes: args.perf_buffer_bytes, + mapsize_info: args.mapsize_info, + mapsize_stacks: args.mapsize_stacks, + mapsize_aggregated_stacks: args.mapsize_aggregated_stacks, + mapsize_unwind_info_chunks: args.mapsize_unwind_info_chunks, + mapsize_unwind_tables: args.mapsize_unwind_tables, + mapsize_rate_limits: args.mapsize_rate_limits, + }; + let (stop_signal_sender, stop_signal_receive) = bounded(1); ctrlc::set_handler(move || { @@ -236,13 +318,7 @@ fn main() -> Result<(), Box> { }) .expect("Error setting Ctrl-C handler"); - let mut p: Profiler<'_> = Profiler::new( - args.libbpf_logs, - args.bpf_logging, - args.duration, - args.sample_freq, - stop_signal_receive, - ); + let mut p: Profiler<'_> = Profiler::new(profiler_config, stop_signal_receive); p.profile_pids(args.pids); p.run(collector.clone()); @@ -305,7 +381,9 @@ mod tests { use super::*; use assert_cmd::Command; use clap::Parser; - use rstest::rstest; + use rand::distributions::{Distribution, Uniform}; + use rstest::{fixture, rstest}; + use std::collections::HashSet; #[test] fn verify_cli() { @@ -322,7 +400,7 @@ mod tests { let actual = String::from_utf8(cmd.unwrap().stdout).unwrap(); insta::assert_yaml_snapshot!(actual, @r###" --- - "Usage: lightswitch [OPTIONS]\n\nOptions:\n --pids \n Specific PIDs to profile\n\n --tids \n Specific TIDs to profile (these can be outside the PIDs selected above)\n\n --show-unwind-info \n Show unwind info for given binary\n\n --show-info \n Show build ID for given binary\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-logs\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n -h, --help\n Print help (see a summary with '-h')\n" + "Usage: lightswitch [OPTIONS]\n\nOptions:\n --pids \n Specific PIDs to profile\n\n --tids \n Specific TIDs to profile (these can be outside the PIDs selected above)\n\n --show-unwind-info \n Show unwind info for given binary\n\n --show-info \n Show build ID for given binary\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-logs\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --perf-buffer-bytes \n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-stacks \n max number of individual stacks to capture before aggregation\n \n [default: 100000]\n\n --mapsize-aggregated-stacks \n Derived from constant MAX_AGGREGATED_STACKS_ENTRIES - max number of unique stacks after aggregation\n \n [default: 10000]\n\n --mapsize-unwind-info-chunks \n max number of chunks allowed inside a shard\n \n [default: 5000]\n\n --mapsize-unwind-tables \n Derived from constant MAX_UNWIND_INFO_SHARDS\n \n [default: 65]\n\n --mapsize-rate-limits \n Derived from constant MAX_PROCESSES\n \n [default: 5000]\n\n -h, --help\n Print help (see a summary with '-h')\n" "###); } @@ -394,4 +472,94 @@ mod tests { } } } + + // Powers of 2 in usize range + #[fixture] + fn power_of_two_usize() -> Vec { + let mut test_usizes = vec![]; + for shift in 0..63 { + let val: usize = 2 << shift; + test_usizes.push(val); + } + test_usizes + } + + // Powers of 2 represented as Strings + #[fixture] + fn power_of_two_strings(power_of_two_usize: Vec) -> Vec { + let mut test_uint_strings = vec![]; + for val in power_of_two_usize { + let val_str = val.to_string(); + test_uint_strings.push(val_str); + } + test_uint_strings + } + + // This fixture produces 5 million random results from the range of usize + // integers that are NOT powers of 2 + #[fixture] + fn all_but_power_of_two_usize(power_of_two_usize: Vec) -> Vec { + let mut test_usize_set: HashSet = HashSet::new(); + let mut test_usize_not_p2: Vec = vec![]; + // usizes that ARE powers of two, for later exclusion + for val in power_of_two_usize { + test_usize_set.insert(val); + } + // Now, for a random sampling of 500000 integers in the range of usize, + // excluding any that are known to be powers of 2 + let between = Uniform::from(0..=usize::MAX); + let mut rng = rand::thread_rng(); + for _ in 0..500000 { + let usize_int: usize = between.sample(&mut rng); + if test_usize_set.contains(&usize_int) { + // We know this is a power of 2, already tested separately, skip + continue; + } + test_usize_not_p2.push(usize_int); + } + test_usize_not_p2 + } + + // all_but_power_of_two_usize, but as Strings + #[fixture] + fn all_but_power_of_two_strings(all_but_power_of_two_usize: Vec) -> Vec { + let mut test_uint_strings: Vec = vec![]; + for val in all_but_power_of_two_usize { + let val_str = val.to_string(); + test_uint_strings.push(val_str); + } + test_uint_strings + } + + // Testing is_power_of_two predicate used by perf_buffer_bytes + // value_parser() + #[rstest] + fn test_should_be_powers_of_two(power_of_two_usize: Vec) { + for val in power_of_two_usize { + assert!(is_power_of_two(val)) + } + } + + #[rstest] + fn test_should_not_be_powers_of_two(all_but_power_of_two_usize: Vec) { + for val in all_but_power_of_two_usize { + assert!(!is_power_of_two(val)) + } + } + + // Testing the value_parser() implementation for perf_buffer_bytes + #[rstest] + fn args_should_be_powers_of_two(power_of_two_strings: Vec) { + for val_string in power_of_two_strings { + assert!(value_is_power_of_two(val_string.as_str()).is_ok()) + } + } + + #[rstest] + fn args_should_not_be_powers_of_two(all_but_power_of_two_strings: Vec) { + for non_p2_string in all_but_power_of_two_strings { + let result = value_is_power_of_two(non_p2_string.as_str()); + assert!(result.is_err()); + } + } } diff --git a/src/profiler.rs b/src/profiler.rs index d20c2d1..cd7e618 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -166,6 +166,8 @@ pub struct Profiler<'bpf> { duration: Duration, // Per-CPU Sampling Frequency of this profile in Hz sample_freq: u16, + // Size of each perf buffer, in bytes + perf_buffer_bytes: usize, session_duration: Duration, } @@ -174,10 +176,6 @@ const MAX_SHARDS: u64 = MAX_UNWIND_INFO_SHARDS as u64; const SHARD_CAPACITY: usize = MAX_UNWIND_TABLE_SIZE as usize; const MAX_CHUNKS: usize = MAX_UNWIND_TABLE_CHUNKS as usize; -// Make each perf buffer 512 KB -// TODO: should make this configurable via a command line argument in future -const PERF_BUFFER_BYTES: usize = 512 * 1024; - #[derive(Debug, Hash, Eq, PartialEq)] pub struct RawAggregatedSample { pub pid: i32, @@ -288,37 +286,135 @@ impl fmt::Display for SymbolizedAggregatedSample { pub type RawAggregatedProfile = Vec; pub type SymbolizedAggregatedProfile = Vec; +pub struct ProfilerConfig { + pub libbpf_debug: bool, + pub bpf_logging: bool, + pub duration: Duration, + pub sample_freq: u16, + pub perf_buffer_bytes: usize, + pub mapsize_info: bool, + pub mapsize_stacks: u32, + pub mapsize_aggregated_stacks: u32, + pub mapsize_unwind_info_chunks: u32, + pub mapsize_unwind_tables: u32, + pub mapsize_rate_limits: u32, +} + +// Note that we normally pass in the defaults from Clap, and we don't want +// to be in the business of keeping the default values defined in Clap in sync +// with the defaults defined here. So these are some defaults that will +// almost always be overridden. +impl Default for ProfilerConfig { + fn default() -> Self { + Self { + libbpf_debug: false, + bpf_logging: false, + duration: Duration::MAX, + sample_freq: 19, + perf_buffer_bytes: 512 * 1024, + mapsize_info: false, + mapsize_stacks: 100000, + mapsize_aggregated_stacks: 10000, + mapsize_unwind_info_chunks: 5000, + mapsize_unwind_tables: 65, + mapsize_rate_limits: 5000, + } + } +} + impl Default for Profiler<'_> { fn default() -> Self { let (_stop_signal_send, stop_signal_receive) = bounded(1); - Self::new(false, false, Duration::MAX, 19, stop_signal_receive) + Self::new(ProfilerConfig::default(), stop_signal_receive) } } impl Profiler<'_> { - pub fn new( - libbpf_debug: bool, - bpf_logging: bool, - duration: Duration, - sample_freq: u16, - stop_signal_receive: Receiver<()>, - ) -> Self { + pub fn new(profiler_config: ProfilerConfig, stop_signal_receive: Receiver<()>) -> Self { + let duration = profiler_config.duration; + let sample_freq = profiler_config.sample_freq; + let perf_buffer_bytes = profiler_config.perf_buffer_bytes; let mut skel_builder: ProfilerSkelBuilder = ProfilerSkelBuilder::default(); - skel_builder.obj_builder.debug(libbpf_debug); + skel_builder.obj_builder.debug(profiler_config.libbpf_debug); let mut open_skel = skel_builder.open().expect("open skel"); + // mapsize modifications can only be made before the maps are actually loaded + // Initialize map sizes with defaults or modifications + open_skel + .maps_mut() + .stacks() + .set_max_entries(profiler_config.mapsize_stacks) + .expect("Unable to set stacks map max_entries"); + open_skel + .maps_mut() + .aggregated_stacks() + .set_max_entries(profiler_config.mapsize_aggregated_stacks) + .expect("Unable to set aggregated_stacks map max_entries"); + open_skel + .maps_mut() + .unwind_info_chunks() + .set_max_entries(profiler_config.mapsize_unwind_info_chunks) + .expect("Unable to set unwind_info_chunks map max_entries"); + open_skel + .maps_mut() + .unwind_tables() + .set_max_entries(profiler_config.mapsize_unwind_tables) + .expect("Unable to set unwind_tables map max_entries"); + open_skel + .maps_mut() + .rate_limits() + .set_max_entries(profiler_config.mapsize_rate_limits) + .expect("Unable to set rate_limits map max_entries"); open_skel .rodata_mut() .lightswitch_config .verbose_logging - .write(bpf_logging); + .write(profiler_config.bpf_logging); let bpf = open_skel.load().expect("load skel"); info!("native unwinder BPF program loaded"); let native_unwinder_maps = bpf.maps(); let exec_mappings_fd = native_unwinder_maps.exec_mappings().as_fd(); + // If mapsize_info requested, pull the max_entries from each map of + // interest and print out + if profiler_config.mapsize_info { + info!("eBPF ACTUAL map size Configuration:"); + info!( + "stacks: {}", + bpf.maps().stacks().info().unwrap().info.max_entries + ); + info!( + "aggregated_stacks: {}", + bpf.maps() + .aggregated_stacks() + .info() + .unwrap() + .info + .max_entries + ); + info!( + "unwind_info_chunks: {}", + bpf.maps() + .unwind_info_chunks() + .info() + .unwrap() + .info + .max_entries + ); + info!( + "unwind_tables: {}", + bpf.maps().unwind_tables().info().unwrap().info.max_entries + ); + info!( + "rate_limits: {}", + bpf.maps().rate_limits().info().unwrap().info.max_entries + ); + } + let mut tracers_builder = TracersSkelBuilder::default(); - tracers_builder.obj_builder.debug(libbpf_debug); + tracers_builder + .obj_builder + .debug(profiler_config.libbpf_debug); let open_tracers = tracers_builder.open().expect("open skel"); open_tracers .maps() @@ -365,6 +461,7 @@ impl Profiler<'_> { profile_receive, duration, sample_freq, + perf_buffer_bytes, session_duration: Duration::from_secs(5), } } @@ -398,23 +495,25 @@ impl Profiler<'_> { // New process events. let chan_send = self.new_proc_chan_send.clone(); let perf_buffer = PerfBufferBuilder::new(self.bpf.maps().events()) - .pages(PERF_BUFFER_BYTES / page_size::get()) + .pages(self.perf_buffer_bytes / page_size::get()) .sample_cb(move |_cpu: i32, data: &[u8]| { Self::handle_event(&chan_send, data); }) .lost_cb(Self::handle_lost_events) .build() + // TODO: Instead of unwrap, consume and emit any error, with + // .expect() perhaps? .unwrap(); let _poll_thread = thread::spawn(move || loop { perf_buffer.poll(Duration::from_millis(100)).expect("poll"); }); - // Trace events are received here, such memory unmaps. + // Trace events are received here, such as memory unmaps. let tracers_send = self.tracers_chan_send.clone(); let tracers_events_perf_buffer = PerfBufferBuilder::new(self.tracers.maps().tracer_events()) - .pages(PERF_BUFFER_BYTES / page_size::get()) + .pages(self.perf_buffer_bytes / page_size::get()) .sample_cb(move |_cpu: i32, data: &[u8]| { let mut event = tracer_event_t::default(); plain::copy_from_bytes(&mut event, data).expect("serde tracers event"); @@ -426,6 +525,8 @@ impl Profiler<'_> { warn!("lost {} events from the tracers", lost_count); }) .build() + // TODO: Instead of unwrap, consume and emit any error, with + // .expect() perhaps? .unwrap(); let _tracers_poll_thread = thread::spawn(move || loop { diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 0f3d2b4..24bac4b 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -8,8 +8,8 @@ use crossbeam_channel::bounded; use lightswitch::collector::{AggregatorCollector, Collector}; use lightswitch::profile::symbolize_profile; -use lightswitch::profiler::Profiler; use lightswitch::profiler::SymbolizedAggregatedProfile; +use lightswitch::profiler::{Profiler, ProfilerConfig}; /// Find the `nix` binary either in the $PATH or in the below hardcoded location. fn nix_bin() -> String { @@ -99,14 +99,16 @@ fn test_integration() { let collector = Arc::new(Mutex::new( Box::new(AggregatorCollector::new()) as Box )); + + let profiler_config = ProfilerConfig { + libbpf_debug: bpf_test_debug, + bpf_logging: bpf_test_debug, + duration: Duration::from_secs(5), + sample_freq: 999, + ..Default::default() + }; let (_stop_signal_send, stop_signal_receive) = bounded(1); - let mut p = Profiler::new( - bpf_test_debug, - bpf_test_debug, - Duration::from_secs(5), - 999, - stop_signal_receive, - ); + let mut p = Profiler::new(profiler_config, stop_signal_receive); p.profile_pids(vec![cpp_proc.pid()]); p.run(collector.clone()); let collector = collector.lock().unwrap();