Skip to content

Commit

Permalink
Introduces Config::periodic_announce_to_down_members
Browse files Browse the repository at this point in the history
New feature to (try to) recorver from network partitions
  • Loading branch information
caio committed Mar 19, 2024
1 parent 4c4b684 commit 25defef
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 27 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ or problems.
handler implementations now only need to emit an identifier (Key)
for each value being broadcast instead of managing the allocation
See the `BroadcastHandler` documentation and examples for details
- `Config::periodic_announce_to_down_members`: Foca periodically
tries to join with members it considers down, as an attempt to
recover from a network partition. This setting is **enabled** by
default for `Config::new_wan` and `Config::new_lan`
- There's now `Notification::Rename` that signals whenever an
identity with a conflicting `Addr` in the cluster gets replaced
by a newer one
- There's no need to manage the list of members externally anymore:
foca does it all for you and `Foca::iter_members` only lists
Foca does it all for you and `Foca::iter_members` only lists
the unique (by `Identity::Addr`), freshest identities
- `examples/foca_insecure_udp_agent.rs` now comes with a fully working
custom broadcast example
Expand Down
28 changes: 28 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ pub struct Config {
/// disabled if you're aiming at pure SWIM behavior.
pub periodic_announce: Option<PeriodicParams>,

/// How often should foca send an announce message to members it currently
/// considers [`crate::State::Down`]
///
/// This setting instructs foca to try and talk to members that are down
/// so that it can (eventually) recover from network partitions without
/// additional hand-holding.
///
/// It's particularly useful when used in tandem with identities that
/// can auto-rejoin (`crate::Identity::renew`) and with
/// `Self::notify_down_members` enabled.
///
/// This feature is an extension to the SWIM protocol and should be left
/// disabled if you're aiming at pure SWIM behavior.
pub periodic_announce_to_down_members: Option<PeriodicParams>,

/// How often should foca send cluster updates to peers
///
/// By default, SWIM disseminates cluster updates during the direct and
Expand Down Expand Up @@ -197,6 +212,7 @@ impl Config {
notify_down_members: false,

periodic_announce: None,
periodic_announce_to_down_members: None,
periodic_gossip: None,
}
}
Expand Down Expand Up @@ -239,6 +255,12 @@ impl Config {
frequency: Duration::from_secs(30),
num_members: NonZeroUsize::new(1).unwrap(),
}),

periodic_announce_to_down_members: Some(PeriodicParams {
frequency: Duration::from_secs(65),
num_members: NonZeroUsize::new(2).unwrap(),
}),

periodic_gossip: Some(PeriodicParams {
frequency: Duration::from_millis(200),
num_members: NonZeroUsize::new(3).unwrap(),
Expand Down Expand Up @@ -275,6 +297,12 @@ impl Config {
frequency: Duration::from_secs(60),
num_members: NonZeroUsize::new(2).unwrap(),
}),

periodic_announce_to_down_members: Some(PeriodicParams {
frequency: Duration::from_secs(125),
num_members: NonZeroUsize::new(3).unwrap(),
}),

periodic_gossip: Some(PeriodicParams {
frequency: Duration::from_millis(500),
num_members: NonZeroUsize::new(4).unwrap(),
Expand Down
117 changes: 103 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,18 @@ where
)
}

fn announce_to_down(&mut self, num_members: usize, mut runtime: impl Runtime<T>) -> Result<()> {
self.member_buf.clear();
self.members
.choose_down_members(num_members, &mut self.member_buf, &mut self.rng);

while let Some(chosen) = self.member_buf.pop() {
self.send_message(chosen.into_identity(), Message::Announce, &mut runtime)?;
}

Ok(())
}

// Pick `num_members` random active members and send `msg` to them
fn choose_and_send(
&mut self,
Expand Down Expand Up @@ -765,6 +777,19 @@ where
}
Ok(())
}
Timer::PeriodicAnnounceDown(token) => {
if token == self.timer_token && self.connection_state == ConnectionState::Connected
{
if let Some(ref params) = self.config.periodic_announce_to_down_members {
runtime.submit_after(
Timer::PeriodicAnnounceDown(self.timer_token),
params.frequency,
);
self.announce_to_down(params.num_members.get(), runtime)?;
}
}
Ok(())
}
}
}

Expand Down Expand Up @@ -793,15 +818,19 @@ where
/// normal operations, changing the configuration parameters is a
/// nicer alternative to recreating the Foca instance.
///
/// Presently, attempting to change [`Config::probe_period`] or
/// [`Config::probe_rtt`] results in [`Error::InvalidConfig`]; For
/// such cases it's recommended to recreate your Foca instance. When
/// an error occurs, every configuration parameter remains
/// Changing [`Config::probe_period`], [`Config::probe_rtt`] or
/// trying to _enable_ any `periodic_` setting results in
/// [`Error::InvalidConfig`]; For such cases it's recommended to
/// recreate your Foca instance.
///
/// When an error occurs, every configuration parameter remains
/// unchanged.
pub fn set_config(&mut self, config: Config) -> Result<()> {
if self.config.probe_period != config.probe_period
|| self.config.probe_rtt != config.probe_rtt
|| (self.config.periodic_announce.is_none() && config.periodic_announce.is_some())
|| (self.config.periodic_announce_to_down_members.is_none()
&& config.periodic_announce_to_down_members.is_some())
|| (self.config.periodic_gossip.is_none() && config.periodic_gossip.is_some())
{
Err(Error::InvalidConfig)
Expand Down Expand Up @@ -1327,6 +1356,13 @@ where
runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency);
}

if let Some(ref params) = self.config.periodic_announce_to_down_members {
runtime.submit_after(
Timer::PeriodicAnnounceDown(self.timer_token),
params.frequency,
);
}

if let Some(ref params) = self.config.periodic_gossip {
runtime.submit_after(Timer::PeriodicGossip(self.timer_token), params.frequency);
}
Expand Down Expand Up @@ -3721,14 +3757,18 @@ mod tests {
}

// There are multiple "do this thing periodically" settings. This
// helps test those. Takes:
// - something that knows which configuration to set
// - something that knows which event should be sent
// - the message that should be sent
fn check_periodic_behaviour<F, G>(config_setter: F, mut event_maker: G, message: Message<ID>)
// helps test those.
// It creates a Foca instance (ID=1) with 2 active members (IDs 2 and 3)
// and 2 down members (IDs 4 and 5), then allows the caller to
// verify the runtime afterwards
fn check_periodic_behaviour<F, G, V>(config_setter: F, mut event_maker: G, validator: V)
where
// something that knows which configuration to set
F: Fn(&mut Config, config::PeriodicParams),
// something that knows which event should be sent
G: FnMut(TimerToken) -> Timer<ID>,
// something to inspect the runtime for expected events
V: Fn(AccumulatingRuntime<ID>),
{
let frequency = Duration::from_millis(500);
let num_members = NonZeroUsize::new(2).unwrap();
Expand All @@ -3746,7 +3786,14 @@ mod tests {

// When it becomes active (i.e.: has at least one active member)
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime));
assert_eq!(
Ok(()),
foca.apply(Member::suspect(ID::new(3)), &mut runtime)
);
assert_eq!(Ok(()), foca.apply(Member::down(ID::new(4)), &mut runtime));
assert_eq!(Ok(()), foca.apply(Member::down(ID::new(5)), &mut runtime));

assert_eq!(2, foca.num_members());

// Should schedule the given event
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);
Expand All @@ -3761,11 +3808,10 @@ mod tests {
// It should've scheduled the event again
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);

validator(runtime);
// And sent the message to `num_members` random members
// (since num_members=2 and this instance only knows about two, we know
// which should've been picked)
expect_message!(runtime, ID::new(2), message);
expect_message!(runtime, ID::new(3), message);
}

#[test]
Expand All @@ -3775,7 +3821,10 @@ mod tests {
c.periodic_gossip = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicGossip(t) },
Message::Gossip,
|mut runtime| {
expect_message!(runtime, ID::new(2), Message::<ID>::Gossip);
expect_message!(runtime, ID::new(3), Message::<ID>::Gossip);
},
);
}

Expand All @@ -3786,7 +3835,24 @@ mod tests {
c.periodic_announce = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicAnnounce(t) },
Message::Announce,
|mut runtime| {
expect_message!(runtime, ID::new(2), Message::<ID>::Announce);
expect_message!(runtime, ID::new(3), Message::<ID>::Announce);
},
);
}

#[test]
fn periodic_announce_to_down_members_behaviour() {
check_periodic_behaviour(
|c: &mut Config, p: config::PeriodicParams| {
c.periodic_announce_to_down_members = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicAnnounceDown(t) },
|mut runtime| {
expect_message!(runtime, ID::new(4), Message::<ID>::Announce);
expect_message!(runtime, ID::new(5), Message::<ID>::Announce);
},
);
}

Expand All @@ -3813,6 +3879,29 @@ mod tests {
assert_eq!(Ok(()), foca.set_config(config()));
}

#[test]
fn periodic_announce_to_down_members_cannot_be_enabled_at_runtime() {
let mut c = config();
assert!(c.periodic_announce_to_down_members.is_none());

// A foca instance that's running without periodic announce
let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec());

c.periodic_announce_to_down_members = Some(config::PeriodicParams {
frequency: Duration::from_secs(5),
num_members: NonZeroUsize::new(1).unwrap(),
});

// Must not be able to enable it during runtime
assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone()));

// However, a foca that starts with periodic announce enabled
let mut foca = Foca::new(ID::new(1), c, rng(), codec());

// Is able to turn it off
assert_eq!(Ok(()), foca.set_config(config()));
}

#[test]
fn periodic_gossip_cannot_be_enabled_at_runtime() {
let mut c = config();
Expand Down
45 changes: 33 additions & 12 deletions src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,21 @@ where
}
}

/// XXX This used to be a `next_members()` which would make use of the
/// already shuffled state and then simply advance the cursor
/// to trigger the next shuffle-after-round-robin that `next()`
/// does. However I'm not sure it was a good idea: the point
/// of what `next()` does is giving some sort of determinism giving
/// a high chance that every member will be *pinged* periodically
/// and using the same logic for other "pick random member"
/// mechanisms might break the math.
pub(crate) fn choose_active_members<F>(
fn choose_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
mut rng: impl Rng,
picker: F,
) where
F: Fn(&T) -> bool,
F: Fn(&Member<T>) -> bool,
{
// Basic reservoir sampling
let mut num_chosen = 0;
let mut num_seen = 0;

for member in self.iter_active() {
if !picker(member.id()) {
for member in &self.inner {
if !picker(member) {
continue;
}

Expand All @@ -248,6 +240,29 @@ where
}
}

pub(crate) fn choose_down_members(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
rng: impl Rng,
) {
self.choose_members(wanted, output, rng, |member| !member.is_active());
}

pub(crate) fn choose_active_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
rng: impl Rng,
picker: F,
) where
F: Fn(&T) -> bool,
{
self.choose_members(wanted, output, rng, |member| {
member.is_active() && picker(member.id())
});
}

pub(crate) fn remove_if_down(&mut self, id: &T) -> Option<Member<T>> {
let position = self
.inner
Expand Down Expand Up @@ -763,6 +778,12 @@ mod tests {
member_id.0.parse::<usize>().expect("number") > 4
});
assert_eq!(vec![Member::suspect(Id("5"))], out);

out.clear();
members.choose_down_members(3, &mut out, &mut rng);
assert_eq!(2, out.len());
assert!(out.iter().any(|m| m.id == Id("7")));
assert!(out.iter().any(|m| m.id == Id("6")));
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ pub enum Timer<T> {
/// specified by [`crate::Config::periodic_announce`]
PeriodicAnnounce(TimerToken),

/// Sends a [`crate::Message::Announce`] to randomly chosen members
/// that are condidered [`crate::State::Down`] as specified by
/// [`crate::Config::periodic_announce_to_down_members`]
PeriodicAnnounceDown(TimerToken),

/// Sends a [`crate::Message::Gossip`] to randomly chosen members as
/// specified by [`crate::Config::periodic_gossip`]
PeriodicGossip(TimerToken),
Expand All @@ -192,6 +197,7 @@ impl<T> Timer<T> {
Timer::PeriodicAnnounce(_) => 3,
Timer::PeriodicGossip(_) => 4,
Timer::RemoveDown(_) => 5,
Timer::PeriodicAnnounceDown(_) => 6,
}
}
}
Expand Down

0 comments on commit 25defef

Please sign in to comment.