Skip to content

Commit

Permalink
save local variables from Responder::response_task() for debugging (o…
Browse files Browse the repository at this point in the history
…penzfs#279)

We've seen an unexplained hang where it seems like
`Responder::response_task()` is not sending responses that it should.
To diagnose this, we would like to be able to look at its local
variables if we hit this hang again.  Specifically, the
`rx: mpsc::UnboundedReceiver<ResponseMessage>` and
`output: BufWriter<unix::OwnedWriteHalf>`.

This commit adds a new debugging mechanism, `DebugPointerSet` which is
based on `lazy_static_ptr!`.  It allows us to save several pointers (of
the same type) in a way that will let us find them from the debugger.

This additional mechanism is needed because (unlike existing users of
`lazy_static_ptr!`) there are multiple objects that we need to track
(the number not known at compile time, since it depends on the number of
active connections), and we want to hold references to the objects (rx,
output) without holding any locks.

`response_task()` uses `DebugPointerSet` to save its `rx` and `output`
in a way that we can find them in the debugger if it hangs or crashes.

Example use:
```
(gdb) p zettaobject::server::Responder::response_task::RESPOND_RECEIVERS_PTR.p.value.set.ptr.pointer.data.data.value
$5 = HashSet(size=1) = {
  util::lazy_static_ptr::DebugPointer<(tokio::sync::mpsc::unbounded::UnboundedReceiver<zettaobject::server::ResponseMessage>, tokio::io::util::buf_writer::BufWriter<tokio::net::unix::split_owned::OwnedWriteHalf>)> (
    0x7fc3247b6850
  )}

(gdb) p *(0x7fc3247b6850 as *const (tokio::sync::mpsc::unbounded::UnboundedReceiver<zettaobject::server::ResponseMessage>,tokio::io::util::buf_writer::BufWriter<tokio::net::unix::split_owned::OwnedWriteHalf>))
...
                  head: core::ptr::non_null::NonNull<tokio::sync::mpsc::block::Block<zettaobject::server::ResponseMessage>> {
                    pointer: 0x7fc011140bb0
                  },
...
                          fd: 15
...
    buf: Vec(size=1176) = {2, 0, 0, 0, 16, 0, 0, 0, ...
```
  • Loading branch information
ahrens authored Mar 8, 2022
1 parent ab49257 commit 6d9cce8
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 11 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = "1.0"
chrono = "0.4.19"
config = "0.11"
dashmap = "5.1.0"
derivative = "2.2.0"
enum-map = { version = "1.1.1", features = ["serde"] }
lazy_static = "1.4.0"
log = "0.4"
Expand Down
104 changes: 104 additions & 0 deletions cmd/zfs_object_agent/util/src/lazy_static_ptr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use derivative::Derivative;
pub use lazy_static::lazy_static;
pub use paste::paste;
use std::ops::Deref;
use std::ops::DerefMut;
pub use std::sync::atomic::{AtomicPtr, Ordering};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

/// This macro is similar to `lazy_static!`, but for each static created, it creates an
/// additional static variable with the `_PTR` suffix, which is an `AtomicPtr` to the contents of
Expand Down Expand Up @@ -61,3 +68,100 @@ macro_rules! lazy_static_ptr {
// empty trailing tokens
() => ()
}

#[derive(Derivative)]
#[derivative(Hash(bound = ""))]
#[derivative(PartialEq(bound = ""))]
#[derivative(Eq(bound = ""))]
/// This wrapping struct exists so that we can mark the raw pointer as Send+Sync (i.e. safe to
/// use from different threads).
struct DebugPointer<T>(*const T);
unsafe impl<T> Send for DebugPointer<T> {}
unsafe impl<T> Sync for DebugPointer<T> {}

impl<T> DebugPointer<T> {
fn new(guard: &DebugPointerGuard<T>) -> Self {
Self(&*guard.value)
}
}

/// A RAII guard that deref's to the stored value. When dropped, its pointer will be removed
/// from the DebugPointerSet that it was created from.
pub struct DebugPointerGuard<T> {
value: Box<T>,
set: DebugPointerSet<T>,
}

impl<T> Deref for DebugPointerGuard<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&*self.value
}
}

impl<T> DerefMut for DebugPointerGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.value
}
}

impl<T> Drop for DebugPointerGuard<T> {
fn drop(&mut self) {
let removed = self
.set
.set
.lock()
.unwrap()
.remove(&DebugPointer::new(self));
assert!(removed);
}
}

#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
#[derivative(Default(bound = ""))]
/// This is a set of structs, which we will store pointers to so that we can find them in the
/// debugger. Note that the stored type T is unconstrained (e.g. it need not be Hash), because
/// we are storing (but not dereferencing) a pointer to it. Typical use is combined with
/// `lazy_static_ptr!`:
/// ```
/// fn func(thing: Thing) {
/// lazy_static_ptr! {
/// static ref THINGS: DebugPointerSet<Thing> = Default::default();
/// }
/// let mut thing = THINGS.insert(thing);
/// // use `thing` as usual, it deref's to the passed in Thing
/// thing.method();
/// }
/// ```
/// Note that `lazy_static_ptr!` doesn't work well inside `async`
/// functions/methods/closures, because it's hard to name the variable in the
/// debugger (it has {braces} in its name). The workaround is to either create
/// the lazy_static_ptr! at the file level (not inside a function), or to
/// desugar the `async fn` to a regular `fn` that returns a `Future`.
pub struct DebugPointerSet<T> {
set: Arc<Mutex<HashSet<DebugPointer<T>>>>,
}

impl<T> DebugPointerSet<T> {
pub fn new() -> Self {
Default::default()
}

/// Insert a new object to the DebugPointerSet. The debugger can be used to find a pointer
/// to the object. The object is moved into the returned DebugPointerGuard, which holds the
/// object in a Box, so that its location in memory doesn't change. The DebugPointerGuard
/// can be dereferenced to the contained object. Note that the pointer tracks the location of
/// the DebugPointerGuard's contents, even if the object is moved out of the Guard with
/// `mem::replace()` or `mem::take()`.
pub fn insert(&self, value: T) -> DebugPointerGuard<T> {
let guard = DebugPointerGuard {
value: Box::new(value),
set: self.clone(),
};
let inserted = self.set.lock().unwrap().insert(DebugPointer::new(&guard));
assert!(inserted);
guard
}
}
50 changes: 39 additions & 11 deletions cmd/zfs_object_agent/zettaobject/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use tokio::net::unix::OwnedWriteHalf;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use util::get_tunable;
use util::lazy_static_ptr;
use util::lazy_static_ptr::DebugPointerSet;
use util::maybe_die_with;
use util::message::*;
use util::super_trace;
Expand Down Expand Up @@ -416,20 +418,46 @@ impl Responder {
}
}

async fn response_task(
fn response_task(
output: OwnedWriteHalf,
mut rx: mpsc::UnboundedReceiver<ResponseMessage>,
) {
let mut output = BufWriter::with_capacity(1024 * 1024, output);
while let Some(message) = rx.recv().await {
Self::write_response(&mut output, message).await;
rx: mpsc::UnboundedReceiver<ResponseMessage>,
) -> impl Future<Output = ()> {
let output = BufWriter::with_capacity(1024 * 1024, output);

// It would improve readability if we used a struct rather than a tuple for the state we
// are saving in the DebugPointerSet. However, the debugger can't cast to a struct type
// defined here, because the fully-qualified type name would contain {braces}. The
// alternative would be to declare the struct at the top level, but having the internal
// details of this method spread to the surrounding state seems worse than the tuple.
//
// Similarly, declaring RESPOND_RECEIVERS inside an async closure or function would cause
// its fully-qualified symbol name to contain `{{closure}}`, so we couldn't name it in
// the debugger.
lazy_static_ptr! {
static ref RESPOND_RECEIVERS:
DebugPointerSet<(
mpsc::UnboundedReceiver<ResponseMessage>,
BufWriter<OwnedWriteHalf>
)> = Default::default();
}

// drain the channel before flushing
while let Some(Some(message)) = rx.recv().now_or_never() {
Self::write_response(&mut output, message).await;
}
// Save our rx and output in the global debug state, so that we can find them from the
// debugger.
let mut state = RESPOND_RECEIVERS.insert((rx, output));

async move {
// destructure the tuple back into the rx/output
let (rx, output) = &mut *state;
while let Some(message) = rx.recv().await {
Self::write_response(output, message).await;

output.flush().await.unwrap();
// drain the channel before flushing
while let Some(Some(message)) = rx.recv().now_or_never() {
Self::write_response(output, message).await;
}

output.flush().await.unwrap();
}
}
}
}
Expand Down

0 comments on commit 6d9cce8

Please sign in to comment.