Skip to content

Commit

Permalink
Don't add custom broadcasts to TurnUndead messages
Browse files Browse the repository at this point in the history
  • Loading branch information
caio committed Jun 4, 2024
1 parent 4e88976 commit 8f85710
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## UNRELEASED

- Bugfix: foca could incorrectly attach custom broadcasts to
messages supposed to be lightweight leading to strange decode
errors in the logs and slowing down cluster self-healing, but
no further impact on functionality
See: https://github.com/caio/foca/issues/35

## v0.17.1 - 2024-04-25

- Bugfix: when restarting members, there was a chance foca would
Expand Down
81 changes: 80 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,9 @@ where
let add_custom_broadcast = buf.has_remaining_mut()
// Every message but Announce includes custom broadcasts by
// default
&& !matches!(header.message, Message::Announce)
// NEEDSWORK: this + piggyback logic should be put in the type so
// it doesn't go out of sync
&& !matches!(header.message, Message::Announce|Message::TurnUndead)
// Unless the broadcast handler says no
&& self.broadcast_handler.should_add_broadcast_data(&dst);

Expand Down Expand Up @@ -4337,4 +4339,81 @@ mod tests {
assert_eq!(Ok(()), foca.handle_data(&msg, &mut runtime));
assert_eq!(2, foca.num_members());
}

struct DumbHandler;

struct GrowOnly(u8);

impl Invalidates for GrowOnly {
fn invalidates(&self, other: &Self) -> bool {
self.0 > other.0
}
}

impl BroadcastHandler<ID> for DumbHandler {
type Key = GrowOnly;

type Error = &'static str;

fn receive_item(
&mut self,
mut data: &[u8],
_sender: Option<&ID>,
) -> core::result::Result<Option<Self::Key>, Self::Error> {
if data.has_remaining() {
// bad: will never stop emitting
// good enough for testing tho
Ok(Some(GrowOnly(data.get_u8())))
} else {
Ok(None)
}
}
}

#[test]
fn lightweight_messages() {
let mut foca =
Foca::with_custom_broadcast(ID::new(1), config(), rng(), codec(), DumbHandler);

let mut runtime = AccumulatingRuntime::new();

// Add some members so there is a backlog of updates
assert_eq!(
Ok(()),
foca.apply_many(
[Member::alive(ID::new(2)), Member::alive(ID::new(3)),].into_iter(),
&mut runtime
)
);
assert!(foca.updates_backlog() > 0);

// And some custom broadcasts
assert_eq!(Ok(true), foca.add_broadcast(b"0hello"));
assert_eq!(Ok(true), foca.add_broadcast(b"1world"));
assert!(foca.custom_broadcast_backlog() > 0);

let dst = ID::new(4);
let mut codec = codec();
for msg in [Message::Announce, Message::TurnUndead] {
assert_eq!(Ok(()), foca.send_message(dst, msg.clone(), &mut runtime));
let mut payload = runtime.take_data(dst).expect("must contain message to dst");

let header = codec.decode_header(&mut payload).expect("payload is valid");
assert_eq!(header.message, msg);
assert_eq!(header.dst, dst);
// Lightweight messages should not include anything after
// the header (no updates, no custom broadcasts)
assert!(payload.is_empty(), "message {msg:?} contains trailing data");
}

// Whereas a normal message should
assert_eq!(
Ok(()),
foca.send_message(dst, Message::Ping(0), &mut runtime)
);
let mut payload = runtime.take_data(dst).expect("must contain message to dst");

let _header = codec.decode_header(&mut payload).expect("payload is valid");
assert!(payload.has_remaining());
}
}

0 comments on commit 8f85710

Please sign in to comment.