Skip to content

Commit

Permalink
Merge pull request #265 from benfred/subprocess_fix
Browse files Browse the repository at this point in the history
Fix subprocess sampling
  • Loading branch information
benfred authored Jun 15, 2020
2 parents d605b16 + 7133409 commit e21c2ce
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tempfile = "3.0.3"
proc-maps = "0.1.6"
memmap = "0.7.0"
cpp_demangle = "0.3.0"
serde = "1.0"
serde = {version="1.0", features=["rc"]}
serde_derive = "1.0"
serde_json = "1.0"
rand = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ arg_enum!{
}
}


#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LockingStrategy {
NonBlocking,
#[allow(dead_code)]
AlreadyLocked,
Lock
}
Expand Down
24 changes: 9 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>
use indicatif::ProgressBar;
let progress = match (config.hide_progress, &config.duration) {
(true, _) => ProgressBar::hidden(),
(false, RecordDuration::Seconds(_)) => ProgressBar::new(max_samples.unwrap()),
(false, RecordDuration::Seconds(samples)) => ProgressBar::new(*samples),
(false, RecordDuration::Unlimited) => {
let progress = ProgressBar::new_spinner();

Expand Down Expand Up @@ -256,23 +256,17 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>
module: None, short_filename: None, line: 0, locals: None});
}

if let Some(process_info) = sample.process_info.as_ref() {
// walk process tree up to root, displaying pid + cmdline in output
let processes = process_info.lock().unwrap();
let mut pid = trace.pid;
loop {
let p = processes.get(&pid);
let process = p.as_ref().unwrap();
trace.frames.push(Frame{name: format!("process {}:\"{}\"", pid, process.cmdline),
filename: String::from(""),
module: None, short_filename: None, line: 0, locals: None});

match process.ppid {
Some(ppid) => { pid = ppid; },
None => { break; }
if let Some(process_info) = trace.process_info.as_ref().map(|x| x) {
trace.frames.push(process_info.to_frame());
let mut parent = process_info.parent.as_ref();
while parent.is_some() {
if let Some(process_info) = parent {
trace.frames.push(process_info.to_frame());
parent = process_info.parent.as_ref();
}
}
}

output.increment(&trace)?;
}

Expand Down
3 changes: 2 additions & 1 deletion src/python_spy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ impl PythonSpy {
{
if self.config.native {
if let Some(native) = self.native.as_mut() {
let os_thread = remoteprocess::Thread::new(os_thread_id.unwrap())?;
let thread_id = os_thread_id.ok_or_else(|| format_err!("failed to get os threadid"))?;
let os_thread = remoteprocess::Thread::new(thread_id)?;
trace.frames = native.merge_native_thread(&trace.frames, &os_thread)?
}
}
Expand Down
115 changes: 58 additions & 57 deletions src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use remoteprocess::Pid;
use crate::timer::Timer;
use crate::python_spy::PythonSpy;
use crate::config::Config;
use crate::stack_trace::StackTrace;
use crate::stack_trace::{StackTrace, ProcessInfo};
use crate::version::Version;

pub struct Sampler {
Expand All @@ -22,7 +22,6 @@ pub struct Sampler {
pub struct Sample {
pub traces: Vec<StackTrace>,
pub sampling_errors: Option<Vec<(Pid, Error)>>,
pub process_info: Option<Arc<Mutex<HashMap<Pid, ProcessInfo>>>>,
pub late: Option<Duration>
}

Expand Down Expand Up @@ -71,7 +70,7 @@ impl Sampler {
};

let late = sleep.err();
if tx.send(Sample{traces: traces, sampling_errors, process_info: None, late}).is_err() {
if tx.send(Sample{traces: traces, sampling_errors, late}).is_err() {
break;
}
}
Expand All @@ -85,19 +84,17 @@ impl Sampler {
/// process or child processes
fn new_subprocess_sampler(pid: Pid, config: &Config) -> Result<Sampler, Error> {
// Initialize a PythonSpy object per child, and build up the process tree
let mut processes = HashMap::new();
let mut spies = HashMap::new();
processes.insert(pid, ProcessInfo::new(pid, None)?);
spies.insert(pid, PythonSpyThread::new(pid, &config));

spies.insert(pid, PythonSpyThread::new(pid, None, &config)?);
let process = remoteprocess::Process::new(pid)?;
for (childpid, parentpid) in process.child_processes()? {
spies.insert(childpid, PythonSpyThread::new(childpid, &config));
// If we can't create the child process, don't worry about it
// can happen with zombie child processes etc
match ProcessInfo::new(childpid, Some(parentpid)) {
Ok(process) => { processes.insert(childpid, process); }
match PythonSpyThread::new(childpid, Some(parentpid), &config) {
Ok(spy) => { spies.insert(childpid, spy); },
Err(e) => { warn!("Failed to open process {}: {}", childpid, e); }
};
}
}

// wait for all the various python spy objects to initialize, and if none
Expand All @@ -108,20 +105,21 @@ impl Sampler {

// Create a new thread to periodically monitor for new child processes, and update
// the procesess map
let processes = Arc::new(Mutex::new(processes));
let monitor_processes = processes.clone();
let spies = Arc::new(Mutex::new(spies));
let monitor_spies = spies.clone();
let monitor_config = config.clone();
std::thread::spawn(move || {
while process.exe().is_ok() {
match monitor_processes.lock() {
Ok(mut processes) => {
match monitor_spies.lock() {
Ok(mut spies) => {
for (childpid, parentpid) in process.child_processes().expect("failed to get subprocesses") {
if processes.contains_key(&childpid) {
if spies.contains_key(&childpid) {
continue;
}
match ProcessInfo::new(childpid, Some(parentpid)) {
Ok(process) => { processes.insert(childpid, process); }
Err(e) => { warn!("Failed to open process {}: {}", childpid, e); }
};
match PythonSpyThread::new(childpid, Some(parentpid), &monitor_config) {
Ok(spy) => { spies.insert(childpid, spy); }
Err(e) => { warn!("Failed to create spy for {}: {}", childpid, e); }
}
}
},
Err(e) => { error!("Failed to acquire lock: {}", e); }
Expand All @@ -130,14 +128,17 @@ impl Sampler {
}
});

let mut process_info = HashMap::new();

// Create a new thread to generate samples
let config = config.clone();
let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
std::thread::spawn(move || {
for sleep in Timer::new(config.sampling_rate as f64) {
let mut traces = Vec::new();
let mut sampling_errors = None;
let mut current = match processes.lock() {

let mut spies = match spies.lock() {
Ok(current) => current,
Err(e) => {
error!("Failed to get process tree: {}", e);
Expand All @@ -146,40 +147,42 @@ impl Sampler {
};

// Notify all the initialized spies to generate a trace
for process_info in current.values_mut() {
let pid = process_info.pid;
let spy = spies.entry(pid).or_insert_with(|| PythonSpyThread::new(pid, &config));
for spy in spies.values_mut() {
if spy.initialized() {
spy.notify();
}
}

// collect the traces from each python spy if possible
for process_info in current.values_mut() {
if let Some(spy) = spies.get_mut(&process_info.pid) {
match spy.collect() {
Some(Ok(mut t)) => traces.append(&mut t),
Some(Err(e)) => {
let errors = sampling_errors.get_or_insert_with(|| Vec::new());
errors.push((process_info.pid, e));
},
None => {}
}
for spy in spies.values_mut() {
match spy.collect() {
Some(Ok(mut t)) => { traces.append(&mut t) },
Some(Err(e)) => {
let errors = sampling_errors.get_or_insert_with(|| Vec::new());
errors.push((spy.process.pid, e));
},
None => {}
}
}

// Annotate each trace with the process info
for trace in traces.iter_mut() {
let pid = trace.pid;
// Annotate each trace with the process info for the curren
let process = process_info.entry(pid).or_insert_with(|| {
get_process_info(pid, &spies).map(|p| Arc::new(*p))
});
trace.process_info = process.clone();
}

// Send the collected info back
let process_info = Some(processes.clone());
let late = sleep.err();
if tx.send(Sample{traces, sampling_errors, late, process_info}).is_err() {
if tx.send(Sample{traces, sampling_errors, late}).is_err() {
break;
}

// remove dead processes from the map, and check after removal
// if we have any python processes left
current.retain(|_, x| x.process.exe().is_ok());
spies.retain(|pid, _| current.contains_key(pid));
if spies.values().all(|x| !x.running) {
// If all of our spies have stopped, we're done
if spies.len() == 0 || spies.values().all(|x| !x.running) {
break;
}
}
Expand All @@ -195,36 +198,27 @@ impl Iterator for Sampler {
}
}

pub struct ProcessInfo {
pub pid: Pid,
pub ppid: Option<Pid>,
pub cmdline: String,
pub process: remoteprocess::Process
}

impl ProcessInfo {
fn new(pid: Pid, ppid: Option<Pid>) -> Result<ProcessInfo, Error> {
let process = remoteprocess::Process::new(pid)?;
let cmdline = process.cmdline().map(|x| x.join(" ")).unwrap_or("".to_owned());
Ok(ProcessInfo{pid, ppid, cmdline, process})
}
}

struct PythonSpyThread {
initialized_rx: Receiver<Result<Version, Error>>,
notify_tx: Sender<()>,
sample_rx: Receiver<Result<Vec<StackTrace>, Error>>,
initialized: Option<Result<Version, Error>>,
pub running: bool,
notified: bool,
pub process: remoteprocess::Process,
pub parent: Option<Pid>,
pub command_line: String
}

impl PythonSpyThread {
fn new(pid: Pid, config: &Config) -> PythonSpyThread {
fn new(pid: Pid, parent: Option<Pid>, config: &Config) -> Result<PythonSpyThread, Error> {
let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
let (notify_tx, notify_rx): (Sender<()>, Receiver<()>) = mpsc::channel();
let (sample_tx, sample_rx): (Sender<Result<Vec<StackTrace>, Error>>, Receiver<Result<Vec<StackTrace>, Error>>) = mpsc::channel();
let config = config.clone();
let process = remoteprocess::Process::new(pid)?;
let command_line = process.cmdline().map(|x| x.join(" ")).unwrap_or("".to_owned());

thread::spawn(move || {
// We need to create this object inside the thread here since PythonSpy objects don't
// have the Send trait implemented on linux
Expand Down Expand Up @@ -255,7 +249,7 @@ impl PythonSpyThread {
}
}
});
PythonSpyThread{initialized_rx, notify_tx, sample_rx, initialized: None, running: false, notified: false}
Ok(PythonSpyThread{initialized_rx, notify_tx, sample_rx, process, command_line, parent, initialized: None, running: false, notified: false})
}

fn wait_initialized(&mut self) -> bool {
Expand Down Expand Up @@ -312,4 +306,11 @@ impl PythonSpyThread {
}
}
}
}

fn get_process_info(pid: Pid, spies: &HashMap<Pid, PythonSpyThread>) -> Option<Box<ProcessInfo>> {
spies.get(&pid).map(|spy| {
let parent = spy.parent.and_then(|parentpid| get_process_info(parentpid, spies));
Box::new(ProcessInfo{pid, parent, command_line: spy.command_line.clone()})
})
}
22 changes: 20 additions & 2 deletions src/stack_trace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std;
use std::sync::Arc;

use failure::{Error, ResultExt};

Expand All @@ -23,7 +24,9 @@ pub struct StackTrace {
/// Whether or not the thread held the GIL
pub owns_gil: bool,
/// The frames
pub frames: Vec<Frame>
pub frames: Vec<Frame>,
/// process commandline / parent process info
pub process_info: Option<Arc<ProcessInfo>>
}

/// Information about a single function call in a stack trace
Expand Down Expand Up @@ -51,6 +54,13 @@ pub struct LocalVariable {
pub repr: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
pub struct ProcessInfo {
pub pid: Pid,
pub command_line: String,
pub parent: Option<Box<ProcessInfo>>
}

/// Given an InterpreterState, this function returns a vector of stack traces for each thread
pub fn get_stack_traces<I>(interpreter: &I, process: &Process) -> Result<Vec<StackTrace>, Error>
where I: InterpreterState {
Expand Down Expand Up @@ -108,7 +118,7 @@ pub fn get_stack_trace<T>(thread: &T, process: &Process, copy_locals: bool) -> R
frame_ptr = frame.back();
}

Ok(StackTrace{pid: process.pid, frames, thread_id: thread.thread_id(), thread_name: None, owns_gil: false, active: true, os_thread_id: None})
Ok(StackTrace{pid: process.pid, frames, thread_id: thread.thread_id(), thread_name: None, owns_gil: false, active: true, os_thread_id: None, process_info: None})
}

impl StackTrace {
Expand Down Expand Up @@ -186,6 +196,14 @@ fn get_locals<C: CodeObject, F: FrameObject, P: ProcessMemory>(code: &C, framept
Ok(ret)
}

impl ProcessInfo {
pub fn to_frame(&self) -> Frame {
Frame{name: format!("process {}:\"{}\"", self.pid, self.command_line),
filename: String::from(""),
module: None, short_filename: None, line: 0, locals: None}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit e21c2ce

Please sign in to comment.