Skip to content

Commit

Permalink
LLMP Timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
domenukk authored and addisoncrump committed Jan 24, 2023
1 parent 3800c95 commit 8a4336a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 9 deletions.
78 changes: 72 additions & 6 deletions libafl/src/bolts/llmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1930,16 +1930,17 @@ where
/// The broker walks all pages and looks for changes, then broadcasts them on
/// its own shared page, once.
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<bool, Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut new_messages = false;
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
new_messages |= self.handle_new_msgs(i as u32, on_new_msg)?;
}
}
Ok(())
Ok(new_messages)
}

/// Internal function, returns true when shuttdown is requested by a `SIGINT` signal
Expand All @@ -1958,9 +1959,66 @@ where
false
}

/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns.
/// Will call `on_timeout` roughly after `timeout`
/// Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
#[cfg(feature = "std")]
pub fn loop_with_timeouts<F>(
&mut self,
on_new_msg_or_timeout: &mut F,
timeout: Duration,
sleep_time: Option<Duration>,
) where
F: FnMut(Option<(ClientId, Tag, Flags, &[u8])>) -> Result<LlmpMsgHookResult, Error>,
{
use super::current_milliseconds;

#[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut LLMP_SIGHANDLER_STATE) } {
// We can live without a proper ctrl+c signal handler. Print and ignore.
#[cfg(feature = "std")]
println!("Failed to setup signal handlers: {_e}");
}

let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout;

while !self.is_shutting_down() {
if current_milliseconds() > end_time {
on_new_msg_or_timeout(None).expect("An error occured in broker timeout. Exiting.");
end_time = current_milliseconds() + timeout;
}

if self
.once(&mut |client_id, tag, flags, buf| {
on_new_msg_or_timeout(Some((client_id, tag, flags, buf)))
})
.expect("An error occurred when brokering. Exiting.")
{
end_time = current_milliseconds() + timeout;
}

#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}

#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {:?})", time);
}
}
self.llmp_out
.send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}

/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
/// If you need to run code even if no update got sent, use [`Self::loop_with_timeout`].
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
Expand Down Expand Up @@ -2338,13 +2396,19 @@ where
Ok(ret)
}

/// broker broadcast to its own page for all others to read */
/// Broker broadcast to its own page for all others to read
/// Returns `true` if new messages were broker-ed
#[inline]
#[allow(clippy::cast_ptr_alignment)]
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
unsafe fn handle_new_msgs<F>(
&mut self,
client_id: u32,
on_new_msg: &mut F,
) -> Result<bool, Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut new_mesages = false;
let mut next_id = self.llmp_clients.len() as u32;

// TODO: We could memcpy a range of pending messages, instead of one by one.
Expand All @@ -2354,11 +2418,13 @@ where
match client.recv()? {
None => {
// We're done handling this client
return Ok(());
return Ok(new_mesages);
}
Some(msg) => msg,
}
};
// We got a new message
new_mesages = true;

match (*msg).tag {
// first, handle the special, llmp-internal messages
Expand Down
50 changes: 47 additions & 3 deletions libafl/src/events/llmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
};
use crate::{
bolts::{
llmp::{self, Flags, LlmpClient, LlmpClientDescription, Tag},
llmp::{self, LlmpClient, LlmpClientDescription, Tag},
shmem::ShMemProvider,
},
events::{
Expand Down Expand Up @@ -123,12 +123,13 @@ where
}

/// Run forever in the broker
#[cfg(not(features = "std"))]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_forever(
&mut |client_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| {
&mut |client_id, tag, _flags, msg| {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
Expand Down Expand Up @@ -156,6 +157,49 @@ where
Ok(())
}

/// Run forever in the broker
#[cfg(features = "std")]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_with_timeouts(
&mut |msg_or_timeout| {
if let Some((client_id, tag, _flags, msg)) = msg_or_timeout {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(monitor, client_id, &event)? {
BrokerEventResult::Forward => {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
}
} else {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
} else {
monitor.display("Timeout".into(), 0);
Ok(llmp::LlmpMsgHookResult::Handled)
}
},
Duration::from_millis(1000),
Some(Duration::from_millis(5)),
);

Ok(())
}

/// Handle arriving events in the broker
#[allow(clippy::unnecessary_wraps)]
fn handle_in_broker(
Expand Down Expand Up @@ -453,7 +497,7 @@ where
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags: Flags = LLMP_FLAG_INITIALIZED;
let flags = LLMP_FLAG_INITIALIZED;

match self.compressor.compress(&serialized)? {
Some(comp_buf) => {
Expand Down

0 comments on commit 8a4336a

Please sign in to comment.