From 65895fd5f0be843a5a531886b118d54590bb1113 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 2 Oct 2023 18:31:04 -0400 Subject: [PATCH] Emit `ClusterChange::Remove` only if node generation IDs match (#3899) --- quickwit/quickwit-cluster/src/change.rs | 120 ++++++++++++++--------- quickwit/quickwit-cluster/src/cluster.rs | 4 +- quickwit/quickwit-cluster/src/lib.rs | 2 + 3 files changed, 78 insertions(+), 48 deletions(-) diff --git a/quickwit/quickwit-cluster/src/change.rs b/quickwit/quickwit-cluster/src/change.rs index f1dba458071..0d3c9b3fcd3 100644 --- a/quickwit/quickwit-cluster/src/change.rs +++ b/quickwit/quickwit-cluster/src/change.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use chitchat::{ChitchatId, NodeState}; @@ -25,7 +26,7 @@ use quickwit_common::tower::{make_channel, warmup_channel}; use tracing::{info, warn}; use crate::member::NodeStateExt; -use crate::ClusterNode; +use crate::{ClusterNode, NodeId}; #[derive(Debug, Clone)] pub enum ClusterChange { @@ -39,7 +40,7 @@ pub enum ClusterChange { pub(crate) async fn compute_cluster_change_events( cluster_id: &str, self_chitchat_id: &ChitchatId, - previous_nodes: &mut BTreeMap, + previous_nodes: &mut BTreeMap, previous_node_states: &BTreeMap, new_node_states: &BTreeMap, ) -> Vec { @@ -96,17 +97,8 @@ async fn compute_cluster_change_events_on_added( self_chitchat_id: &ChitchatId, new_chitchat_id: &ChitchatId, new_node_state: &NodeState, - previous_nodes: &mut BTreeMap, + previous_nodes: &mut BTreeMap, ) -> Option { - let is_self_node = self_chitchat_id == new_chitchat_id; - if !is_self_node { - info!( - cluster_id=%cluster_id, - node_id=%new_chitchat_id.node_id, - "Node `{}` has joined the cluster.", - new_chitchat_id.node_id - ); - } let grpc_advertise_addr = match new_node_state.grpc_advertise_addr() { Ok(addr) => addr, Err(error) => { @@ -120,6 +112,7 @@ async fn compute_cluster_change_events_on_added( } }; let channel = make_channel(grpc_advertise_addr).await; + let is_self_node = self_chitchat_id == new_chitchat_id; let new_node = match ClusterNode::try_new( new_chitchat_id.clone(), new_node_state, @@ -137,8 +130,26 @@ async fn compute_cluster_change_events_on_added( return None; } }; - previous_nodes.insert(new_chitchat_id.clone(), new_node.clone()); + let new_node_id = new_node.chitchat_id().node_id.clone(); + let previous_node_opt = previous_nodes.insert(new_node_id, new_node.clone()); + if !is_self_node { + if previous_node_opt.is_some() { + info!( + cluster_id=%cluster_id, + node_id=%new_chitchat_id.node_id, + "Node `{}` has rejoined the cluster.", + new_chitchat_id.node_id + ); + } else { + info!( + cluster_id=%cluster_id, + node_id=%new_chitchat_id.node_id, + "Node `{}` has joined the cluster.", + new_chitchat_id.node_id + ); + } + } if new_node.is_ready() { warmup_channel(new_node.channel()).await; @@ -160,9 +171,9 @@ async fn compute_cluster_change_events_on_updated( self_chitchat_id: &ChitchatId, updated_chitchat_id: &ChitchatId, updated_node_state: &NodeState, - previous_nodes: &mut BTreeMap, + previous_nodes: &mut BTreeMap, ) -> Option { - let previous_node = previous_nodes.get(updated_chitchat_id)?.clone(); + let previous_node = previous_nodes.get(&updated_chitchat_id.node_id)?.clone(); let previous_channel = previous_node.channel(); let is_self_node = self_chitchat_id == updated_chitchat_id; let updated_node = match ClusterNode::try_new( @@ -182,7 +193,7 @@ async fn compute_cluster_change_events_on_updated( return None; } }; - previous_nodes.insert(updated_chitchat_id.clone(), updated_node.clone()); + previous_nodes.insert(updated_chitchat_id.node_id.clone(), updated_node.clone()); if !previous_node.is_ready() && updated_node.is_ready() { warmup_channel(updated_node.channel()).await; @@ -217,23 +228,30 @@ fn compute_cluster_change_events_on_removed( cluster_id: &str, self_chitchat_id: &ChitchatId, removed_chitchat_id: &ChitchatId, - previous_nodes: &mut BTreeMap, + previous_nodes: &mut BTreeMap, ) -> Option { - if self_chitchat_id != removed_chitchat_id { - info!( - cluster_id=%cluster_id, - node_id=%removed_chitchat_id.node_id, - "Node `{}` has left the cluster.", - removed_chitchat_id.node_id - ); - } - let previous_node = previous_nodes.remove(removed_chitchat_id)?; + let removed_node_id = removed_chitchat_id.node_id.clone(); + + if let Entry::Occupied(previous_node_entry) = previous_nodes.entry(removed_node_id) { + let previous_node_ref = previous_node_entry.get(); + + if previous_node_ref.chitchat_id().generation_id == removed_chitchat_id.generation_id { + if self_chitchat_id != removed_chitchat_id { + info!( + cluster_id=%cluster_id, + node_id=%removed_chitchat_id.node_id, + "Node `{}` has left the cluster.", + removed_chitchat_id.node_id + ); + } + let previous_node = previous_node_entry.remove(); - if previous_node.is_ready() { - Some(ClusterChange::Remove(previous_node)) - } else { - None - } + if previous_node.is_ready() { + return Some(ClusterChange::Remove(previous_node)); + } + } + }; + None } #[cfg(test)] @@ -361,7 +379,7 @@ mod tests { .await; assert!(event.is_none()); - let node = previous_nodes.get(&new_chitchat_id).unwrap(); + let node = previous_nodes.get(&new_chitchat_id.node_id).unwrap(); assert_eq!(node.chitchat_id(), &new_chitchat_id); assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); @@ -396,7 +414,7 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(!node.is_self_node()); assert!(node.is_ready()); - assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node); + assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node); } { // Self node joined the cluster and is ready. @@ -425,7 +443,7 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(node.is_self_node()); assert!(node.is_ready()); - assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node); + assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node); } } @@ -453,7 +471,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]); let updated_node_state = NodeStateBuilder::default() .with_grpc_advertise_addr(grpc_advertise_addr) @@ -476,7 +494,10 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(node.is_ready()); assert!(!node.is_self_node()); - assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node); + assert_eq!( + previous_nodes.get(&updated_chitchat_id.node_id).unwrap(), + &node + ); } { // Node changed. @@ -497,7 +518,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]); let updated_node_state = NodeStateBuilder::default() .with_grpc_advertise_addr(grpc_advertise_addr) @@ -520,7 +541,10 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(!node.is_self_node()); assert!(node.is_ready()); - assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node); + assert_eq!( + previous_nodes.get(&updated_chitchat_id.node_id).unwrap(), + &node + ); } { // Node is no longer ready. @@ -541,7 +565,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]); let updated_node_state = NodeStateBuilder::default() .with_grpc_advertise_addr(grpc_advertise_addr) @@ -564,7 +588,10 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(!node.is_self_node()); assert!(!node.is_ready()); - assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node); + assert_eq!( + previous_nodes.get(&updated_chitchat_id.node_id).unwrap(), + &node + ); } } @@ -606,7 +633,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(removed_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), previous_node)]); let event_opt = compute_cluster_change_events_on_removed( &cluster_id, @@ -615,7 +642,7 @@ mod tests { &mut previous_nodes, ); assert!(event_opt.is_none()); - assert!(!previous_nodes.contains_key(&removed_chitchat_id)); + assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id)); } { // Node left the cluster in ready state. @@ -630,7 +657,8 @@ mod tests { let node = ClusterNode::try_new(removed_chitchat_id.clone(), &new_node_state, channel, false) .unwrap(); - let mut previous_nodes = BTreeMap::from_iter([(removed_chitchat_id.clone(), node)]); + let mut previous_nodes = + BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), node)]); let event = compute_cluster_change_events_on_removed( &cluster_id, @@ -647,7 +675,7 @@ mod tests { assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr); assert!(!node.is_self_node()); assert!(node.is_ready()); - assert!(!previous_nodes.contains_key(&removed_chitchat_id)); + assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id)); } } @@ -683,7 +711,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]); let previous_node_states = BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]); @@ -747,7 +775,7 @@ mod tests { ) .unwrap(); let mut previous_nodes = - BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]); + BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]); let previous_node_states = BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]); diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index eec47ba020a..d892cb4a4c7 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -45,7 +45,7 @@ use crate::member::{ GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY, READINESS_VALUE_READY, }; -use crate::ClusterNode; +use crate::{ClusterNode, NodeId}; const GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { Duration::from_millis(25) @@ -422,7 +422,7 @@ struct InnerCluster { cluster_id: String, self_chitchat_id: ChitchatId, chitchat_handle: ChitchatHandle, - live_nodes: BTreeMap, + live_nodes: BTreeMap, change_stream_subscribers: Vec>, ready_members_rx: watch::Receiver>, } diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs index de960947acf..24b6caddf58 100644 --- a/quickwit/quickwit-cluster/src/lib.rs +++ b/quickwit/quickwit-cluster/src/lib.rs @@ -41,6 +41,8 @@ pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema}; pub use crate::member::ClusterMember; pub use crate::node::ClusterNode; +pub type NodeId = String; + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct GenerationId(u64);