diff --git a/quickwit/quickwit-cluster/src/change.rs b/quickwit/quickwit-cluster/src/change.rs
index f1dba458071..0d3c9b3fcd3 100644
--- a/quickwit/quickwit-cluster/src/change.rs
+++ b/quickwit/quickwit-cluster/src/change.rs
@@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use chitchat::{ChitchatId, NodeState};
@@ -25,7 +26,7 @@ use quickwit_common::tower::{make_channel, warmup_channel};
use tracing::{info, warn};
use crate::member::NodeStateExt;
-use crate::ClusterNode;
+use crate::{ClusterNode, NodeId};
#[derive(Debug, Clone)]
pub enum ClusterChange {
@@ -39,7 +40,7 @@ pub enum ClusterChange {
pub(crate) async fn compute_cluster_change_events(
cluster_id: &str,
self_chitchat_id: &ChitchatId,
- previous_nodes: &mut BTreeMap,
+ previous_nodes: &mut BTreeMap,
previous_node_states: &BTreeMap,
new_node_states: &BTreeMap,
) -> Vec {
@@ -96,17 +97,8 @@ async fn compute_cluster_change_events_on_added(
self_chitchat_id: &ChitchatId,
new_chitchat_id: &ChitchatId,
new_node_state: &NodeState,
- previous_nodes: &mut BTreeMap,
+ previous_nodes: &mut BTreeMap,
) -> Option {
- let is_self_node = self_chitchat_id == new_chitchat_id;
- if !is_self_node {
- info!(
- cluster_id=%cluster_id,
- node_id=%new_chitchat_id.node_id,
- "Node `{}` has joined the cluster.",
- new_chitchat_id.node_id
- );
- }
let grpc_advertise_addr = match new_node_state.grpc_advertise_addr() {
Ok(addr) => addr,
Err(error) => {
@@ -120,6 +112,7 @@ async fn compute_cluster_change_events_on_added(
}
};
let channel = make_channel(grpc_advertise_addr).await;
+ let is_self_node = self_chitchat_id == new_chitchat_id;
let new_node = match ClusterNode::try_new(
new_chitchat_id.clone(),
new_node_state,
@@ -137,8 +130,26 @@ async fn compute_cluster_change_events_on_added(
return None;
}
};
- previous_nodes.insert(new_chitchat_id.clone(), new_node.clone());
+ let new_node_id = new_node.chitchat_id().node_id.clone();
+ let previous_node_opt = previous_nodes.insert(new_node_id, new_node.clone());
+ if !is_self_node {
+ if previous_node_opt.is_some() {
+ info!(
+ cluster_id=%cluster_id,
+ node_id=%new_chitchat_id.node_id,
+ "Node `{}` has rejoined the cluster.",
+ new_chitchat_id.node_id
+ );
+ } else {
+ info!(
+ cluster_id=%cluster_id,
+ node_id=%new_chitchat_id.node_id,
+ "Node `{}` has joined the cluster.",
+ new_chitchat_id.node_id
+ );
+ }
+ }
if new_node.is_ready() {
warmup_channel(new_node.channel()).await;
@@ -160,9 +171,9 @@ async fn compute_cluster_change_events_on_updated(
self_chitchat_id: &ChitchatId,
updated_chitchat_id: &ChitchatId,
updated_node_state: &NodeState,
- previous_nodes: &mut BTreeMap,
+ previous_nodes: &mut BTreeMap,
) -> Option {
- let previous_node = previous_nodes.get(updated_chitchat_id)?.clone();
+ let previous_node = previous_nodes.get(&updated_chitchat_id.node_id)?.clone();
let previous_channel = previous_node.channel();
let is_self_node = self_chitchat_id == updated_chitchat_id;
let updated_node = match ClusterNode::try_new(
@@ -182,7 +193,7 @@ async fn compute_cluster_change_events_on_updated(
return None;
}
};
- previous_nodes.insert(updated_chitchat_id.clone(), updated_node.clone());
+ previous_nodes.insert(updated_chitchat_id.node_id.clone(), updated_node.clone());
if !previous_node.is_ready() && updated_node.is_ready() {
warmup_channel(updated_node.channel()).await;
@@ -217,23 +228,30 @@ fn compute_cluster_change_events_on_removed(
cluster_id: &str,
self_chitchat_id: &ChitchatId,
removed_chitchat_id: &ChitchatId,
- previous_nodes: &mut BTreeMap,
+ previous_nodes: &mut BTreeMap,
) -> Option {
- if self_chitchat_id != removed_chitchat_id {
- info!(
- cluster_id=%cluster_id,
- node_id=%removed_chitchat_id.node_id,
- "Node `{}` has left the cluster.",
- removed_chitchat_id.node_id
- );
- }
- let previous_node = previous_nodes.remove(removed_chitchat_id)?;
+ let removed_node_id = removed_chitchat_id.node_id.clone();
+
+ if let Entry::Occupied(previous_node_entry) = previous_nodes.entry(removed_node_id) {
+ let previous_node_ref = previous_node_entry.get();
+
+ if previous_node_ref.chitchat_id().generation_id == removed_chitchat_id.generation_id {
+ if self_chitchat_id != removed_chitchat_id {
+ info!(
+ cluster_id=%cluster_id,
+ node_id=%removed_chitchat_id.node_id,
+ "Node `{}` has left the cluster.",
+ removed_chitchat_id.node_id
+ );
+ }
+ let previous_node = previous_node_entry.remove();
- if previous_node.is_ready() {
- Some(ClusterChange::Remove(previous_node))
- } else {
- None
- }
+ if previous_node.is_ready() {
+ return Some(ClusterChange::Remove(previous_node));
+ }
+ }
+ };
+ None
}
#[cfg(test)]
@@ -361,7 +379,7 @@ mod tests {
.await;
assert!(event.is_none());
- let node = previous_nodes.get(&new_chitchat_id).unwrap();
+ let node = previous_nodes.get(&new_chitchat_id.node_id).unwrap();
assert_eq!(node.chitchat_id(), &new_chitchat_id);
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
@@ -396,7 +414,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
- assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node);
+ assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node);
}
{
// Self node joined the cluster and is ready.
@@ -425,7 +443,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(node.is_self_node());
assert!(node.is_ready());
- assert_eq!(previous_nodes.get(&new_chitchat_id).unwrap(), &node);
+ assert_eq!(previous_nodes.get(&new_chitchat_id.node_id).unwrap(), &node);
}
}
@@ -453,7 +471,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);
let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
@@ -476,7 +494,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(node.is_ready());
assert!(!node.is_self_node());
- assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
+ assert_eq!(
+ previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
+ &node
+ );
}
{
// Node changed.
@@ -497,7 +518,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);
let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
@@ -520,7 +541,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
- assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
+ assert_eq!(
+ previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
+ &node
+ );
}
{
// Node is no longer ready.
@@ -541,7 +565,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(updated_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(updated_chitchat_id.node_id.clone(), previous_node)]);
let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
@@ -564,7 +588,10 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(!node.is_ready());
- assert_eq!(previous_nodes.get(&updated_chitchat_id).unwrap(), &node);
+ assert_eq!(
+ previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
+ &node
+ );
}
}
@@ -606,7 +633,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(removed_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), previous_node)]);
let event_opt = compute_cluster_change_events_on_removed(
&cluster_id,
@@ -615,7 +642,7 @@ mod tests {
&mut previous_nodes,
);
assert!(event_opt.is_none());
- assert!(!previous_nodes.contains_key(&removed_chitchat_id));
+ assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id));
}
{
// Node left the cluster in ready state.
@@ -630,7 +657,8 @@ mod tests {
let node =
ClusterNode::try_new(removed_chitchat_id.clone(), &new_node_state, channel, false)
.unwrap();
- let mut previous_nodes = BTreeMap::from_iter([(removed_chitchat_id.clone(), node)]);
+ let mut previous_nodes =
+ BTreeMap::from_iter([(removed_chitchat_id.node_id.clone(), node)]);
let event = compute_cluster_change_events_on_removed(
&cluster_id,
@@ -647,7 +675,7 @@ mod tests {
assert_eq!(node.grpc_advertise_addr(), grpc_advertise_addr);
assert!(!node.is_self_node());
assert!(node.is_ready());
- assert!(!previous_nodes.contains_key(&removed_chitchat_id));
+ assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id));
}
}
@@ -683,7 +711,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]);
let previous_node_states =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]);
@@ -747,7 +775,7 @@ mod tests {
)
.unwrap();
let mut previous_nodes =
- BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node)]);
+ BTreeMap::from_iter([(self_chitchat_id.node_id.clone(), previous_node)]);
let previous_node_states =
BTreeMap::from_iter([(self_chitchat_id.clone(), previous_node_state)]);
diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs
index eec47ba020a..d892cb4a4c7 100644
--- a/quickwit/quickwit-cluster/src/cluster.rs
+++ b/quickwit/quickwit-cluster/src/cluster.rs
@@ -45,7 +45,7 @@ use crate::member::{
GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
-use crate::ClusterNode;
+use crate::{ClusterNode, NodeId};
const GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(25)
@@ -422,7 +422,7 @@ struct InnerCluster {
cluster_id: String,
self_chitchat_id: ChitchatId,
chitchat_handle: ChitchatHandle,
- live_nodes: BTreeMap,
+ live_nodes: BTreeMap,
change_stream_subscribers: Vec>,
ready_members_rx: watch::Receiver>,
}
diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs
index de960947acf..24b6caddf58 100644
--- a/quickwit/quickwit-cluster/src/lib.rs
+++ b/quickwit/quickwit-cluster/src/lib.rs
@@ -41,6 +41,8 @@ pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::ClusterMember;
pub use crate::node::ClusterNode;
+pub type NodeId = String;
+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct GenerationId(u64);