diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 5b4d0497c2e..bb669b6e81e 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -96,6 +96,7 @@ impl ClientId { #[derive(Debug, Eq, PartialEq)] struct AssignedShard { leader_id: NodeId, + follower_id_opt: Option, partition_id: PartitionId, current_position_inclusive: Position, } @@ -180,18 +181,14 @@ impl IngestSource { } async fn truncate(&self, truncation_point: &[(ShardId, Position)]) { - let mut per_leader_truncate_subrequests: HashMap<&NodeId, Vec> = + let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec> = HashMap::new(); for (shard_id, truncate_position) in truncation_point { if matches!(truncate_position, Position::Beginning) { continue; } - let Some(leader_id) = self - .assigned_shards - .get(shard_id) - .map(|shard| &shard.leader_id) - else { + let Some(shard) = self.assigned_shards.get(shard_id) else { warn!( "failed to truncate shard: shard `{}` is no longer assigned", shard_id @@ -208,21 +205,27 @@ impl IngestSource { shard_id: *shard_id, to_position_inclusive, }; - per_leader_truncate_subrequests - .entry(leader_id) + if let Some(follower_id) = &shard.follower_id_opt { + per_ingester_truncate_subrequests + .entry(follower_id) + .or_default() + .push(truncate_subrequest.clone()); + } + per_ingester_truncate_subrequests + .entry(&shard.leader_id) .or_default() .push(truncate_subrequest); } - for (leader_id, truncate_subrequests) in per_leader_truncate_subrequests { - let Some(mut ingester) = self.ingester_pool.get(leader_id) else { + for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests { + let Some(mut ingester) = self.ingester_pool.get(ingester_id) else { warn!( "failed to truncate shard: ingester `{}` is unavailable", - leader_id + ingester_id ); continue; }; let truncate_request = TruncateRequest { - leader_id: leader_id.clone().into(), + ingester_id: ingester_id.clone().into(), subrequests: truncate_subrequests, }; let truncate_future = async move { @@ -349,9 +352,7 @@ impl Source for IngestSource { for acquired_shard in acquire_shards_subresponse.acquired_shards { let leader_id: NodeId = acquired_shard.leader_id.into(); - let follower_id: Option = acquired_shard - .follower_id - .map(|follower_id| follower_id.into()); + let follower_id_opt: Option = acquired_shard.follower_id.map(Into::into); let index_uid: IndexUid = acquired_shard.index_uid.into(); let source_id: SourceId = acquired_shard.source_id; let shard_id = acquired_shard.shard_id; @@ -364,7 +365,7 @@ impl Source for IngestSource { if let Err(error) = ctx .protect_future(self.fetch_stream.subscribe( leader_id.clone(), - follower_id.clone(), + follower_id_opt.clone(), index_uid, source_id, shard_id, @@ -380,6 +381,7 @@ impl Source for IngestSource { let assigned_shard = AssignedShard { leader_id, + follower_id_opt, partition_id, current_position_inclusive, }; @@ -511,7 +513,7 @@ mod tests { .expect_truncate() .once() .returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); + assert_eq!(request.ingester_id, "test-ingester-0"); assert_eq!(request.subrequests.len(), 1); let subrequest = &request.subrequests[0]; @@ -583,6 +585,7 @@ mod tests { let assigned_shard = source.assigned_shards.get(&1).unwrap(); let expected_assigned_shard = AssignedShard { leader_id: "test-ingester-0".into(), + follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: Position::from(11u64), }; @@ -631,6 +634,7 @@ mod tests { 1, AssignedShard { leader_id: "test-ingester-0".into(), + follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: Position::from(11u64), }, @@ -639,6 +643,7 @@ mod tests { 2, AssignedShard { leader_id: "test-ingester-1".into(), + follower_id_opt: None, partition_id: 2u64.into(), current_position_inclusive: Position::from(22u64), }, @@ -721,8 +726,8 @@ mod tests { .expect_truncate() .once() .returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.subrequests.len(), 2); + assert_eq!(request.ingester_id, "test-ingester-0"); + assert_eq!(request.subrequests.len(), 3); let subrequest_0 = &request.subrequests[0]; assert_eq!(subrequest_0.shard_id, 1); @@ -732,6 +737,10 @@ mod tests { assert_eq!(subrequest_1.shard_id, 2); assert_eq!(subrequest_1.to_position_inclusive, 22); + let subrequest_2 = &request.subrequests[2]; + assert_eq!(subrequest_2.shard_id, 3); + assert_eq!(subrequest_2.to_position_inclusive, 33); + Ok(TruncateResponse {}) }); let ingester_0: IngesterServiceClient = ingester_mock_0.into(); @@ -742,18 +751,39 @@ mod tests { .expect_truncate() .once() .returning(|request| { - assert_eq!(request.leader_id, "test-ingester-1"); - assert_eq!(request.subrequests.len(), 1); + assert_eq!(request.ingester_id, "test-ingester-1"); + assert_eq!(request.subrequests.len(), 2); let subrequest_0 = &request.subrequests[0]; - assert_eq!(subrequest_0.shard_id, 3); - assert_eq!(subrequest_0.to_position_inclusive, 33); + assert_eq!(subrequest_0.shard_id, 1); + assert_eq!(subrequest_0.to_position_inclusive, 11); + + let subrequest_1 = &request.subrequests[1]; + assert_eq!(subrequest_1.shard_id, 2); + assert_eq!(subrequest_1.to_position_inclusive, 22); Ok(TruncateResponse {}) }); let ingester_1: IngesterServiceClient = ingester_mock_1.into(); ingester_pool.insert("test-ingester-1".into(), ingester_1.clone()); + let mut ingester_mock_3 = IngesterServiceClient::mock(); + ingester_mock_3 + .expect_truncate() + .once() + .returning(|request| { + assert_eq!(request.ingester_id, "test-ingester-3"); + assert_eq!(request.subrequests.len(), 1); + + let subrequest_0 = &request.subrequests[0]; + assert_eq!(subrequest_0.shard_id, 4); + assert_eq!(subrequest_0.to_position_inclusive, 44); + + Ok(TruncateResponse {}) + }); + let ingester_3: IngesterServiceClient = ingester_mock_3.into(); + ingester_pool.insert("test-ingester-3".into(), ingester_3.clone()); + let runtime_args = Arc::new(SourceRuntimeArgs { pipeline_id, source_config, @@ -778,6 +808,7 @@ mod tests { 1, AssignedShard { leader_id: "test-ingester-0".into(), + follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: Position::from(11u64), }, @@ -786,6 +817,7 @@ mod tests { 2, AssignedShard { leader_id: "test-ingester-0".into(), + follower_id_opt: Some("test-ingester-1".into()), partition_id: 2u64.into(), current_position_inclusive: Position::from(22u64), }, @@ -794,6 +826,7 @@ mod tests { 3, AssignedShard { leader_id: "test-ingester-1".into(), + follower_id_opt: Some("test-ingester-0".into()), partition_id: 3u64.into(), current_position_inclusive: Position::from(33u64), }, @@ -802,6 +835,7 @@ mod tests { 4, AssignedShard { leader_id: "test-ingester-2".into(), + follower_id_opt: Some("test-ingester-3".into()), partition_id: 4u64.into(), current_position_inclusive: Position::from(44u64), }, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/gc.rs b/quickwit/quickwit-ingest/src/ingest_v2/gc.rs index fdda0fa72ef..0a8f437d912 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/gc.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/gc.rs @@ -202,7 +202,8 @@ mod tests { mrecordlog.clone(), state.clone(), ); - tokio::time::sleep(REMOVAL_GRACE_PERIOD * 2).await; + // Wait for the removal task to run. + tokio::time::sleep(Duration::from_millis(100)).await; let state_guard = state.read().await; assert!(state_guard.primary_shards.is_empty()); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 6a779a39779..cf208677af0 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -39,7 +39,6 @@ use quickwit_proto::ingest::ingester::{ OpenReplicationStreamResponse, PersistFailure, PersistFailureKind, PersistRequest, PersistResponse, PersistSuccess, PingRequest, PingResponse, ReplicateRequest, ReplicateSubrequest, SynReplicationMessage, TruncateRequest, TruncateResponse, - TruncateSubrequest, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::metastore::{ @@ -213,7 +212,7 @@ impl Ingester { state.primary_shards.insert(queue_id, primary_shard); } else { let replica_shard = ReplicaShard { - leader_id: success.leader_id.into(), + _leader_id: success.leader_id.into(), shard_state: ShardState::Closed, publish_position_inclusive, replica_position_inclusive: current_position, @@ -531,7 +530,6 @@ impl IngesterService for Ingester { let replication_task_handle = ReplicationTask::spawn( leader_id, follower_id, - self.metastore.clone(), self.mrecordlog.clone(), self.state.clone(), syn_replication_stream, @@ -586,18 +584,16 @@ impl IngesterService for Ingester { &mut self, truncate_request: TruncateRequest, ) -> IngestV2Result { - if truncate_request.leader_id != self.self_node_id { + if truncate_request.ingester_id != self.self_node_id { return Err(IngestV2Error::Internal(format!( "routing error: expected ingester `{}`, got `{}`", - truncate_request.leader_id, self.self_node_id + self.self_node_id, truncate_request.ingester_id, ))); } - let mut gc_candidates: Vec = Vec::new(); + let mut queues_to_remove: Vec = Vec::new(); let mut mrecordlog_guard = self.mrecordlog.write().await; let mut state_guard = self.state.write().await; - let mut truncate_subrequests: HashMap> = HashMap::new(); - for subrequest in truncate_request.subrequests { let queue_id = subrequest.queue_id(); @@ -610,43 +606,34 @@ impl IngesterService for Ingester { })?; primary_shard.set_publish_position_inclusive(subrequest.to_position_inclusive); - if primary_shard.is_gc_candidate() { - gc_candidates.push(queue_id.clone()); + if primary_shard.is_removable() { + queues_to_remove.push(queue_id.clone()); } + continue; } - if let Some(replica_shard) = state_guard.replica_shards.get(&queue_id) { - truncate_subrequests - .entry(replica_shard.leader_id.clone()) - .or_default() - .push(subrequest); - } - } - let mut truncate_futures = FuturesUnordered::new(); + if let Some(replica_shard) = state_guard.replica_shards.get_mut(&queue_id) { + mrecordlog_guard + .truncate(&queue_id, subrequest.to_position_inclusive) + .await + .map_err(|error| { + IngestV2Error::Internal(format!("failed to truncate queue: {error:?}")) + })?; + replica_shard.set_publish_position_inclusive(subrequest.to_position_inclusive); - for (follower_id, subrequests) in truncate_subrequests { - let leader_id = self.self_node_id.clone().into(); - let truncate_request = TruncateRequest { - leader_id, - subrequests, - }; - let replication_client = state_guard - .replication_clients - .get(&follower_id) - .expect("The replication client should be initialized.") - .clone(); - truncate_futures - .push(async move { replication_client.truncate(truncate_request).await }); + if replica_shard.is_removable() { + queues_to_remove.push(queue_id.clone()); + } + } } - // Drop the write lock AFTER pushing the replicate request into the replication client - // channel to ensure that sequential writes in mrecordlog turn into sequential replicate - // requests in the same order. drop(state_guard); - while let Some(truncate_result) = truncate_futures.next().await { - // TODO: Handle errors. - truncate_result?; - } - // TODO: Update publish positions of truncated shards and then delete them when + remove_shards_after( + queues_to_remove, + REMOVAL_GRACE_PERIOD, + self.metastore.clone(), + self.mrecordlog.clone(), + self.state.clone(), + ); let truncate_response = TruncateResponse {}; Ok(truncate_response) } @@ -667,6 +654,7 @@ mod tests { use quickwit_proto::ingest::ingester::{ IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest, + TruncateSubrequest, }; use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::metastore::{ @@ -848,7 +836,7 @@ mod tests { drop(state_guard); - // Wait for the GC task to run. + // Wait for the removal task to run. tokio::time::sleep(Duration::from_millis(100)).await; let state_guard = ingester.state.read().await; @@ -1352,7 +1340,22 @@ mod tests { async fn test_ingester_truncate() { let tempdir = tempfile::tempdir().unwrap(); let self_node_id: NodeId = "test-ingester-0".into(); - let metastore = Arc::new(MockIngestMetastore::default()); + let mut mock_metastore = MockIngestMetastore::default(); + mock_metastore + .expect_delete_shards() + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.shard_ids, [2]); + + let response = DeleteShardsResponse {}; + Ok(response) + }); + let metastore = Arc::new(mock_metastore); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); let replication_factor = 1; @@ -1366,20 +1369,26 @@ mod tests { .await .unwrap(); - let queue_id_00 = queue_id("test-index:0", "test-source", 0); let queue_id_01 = queue_id("test-index:0", "test-source", 1); + let queue_id_02 = queue_id("test-index:0", "test-source", 2); - let mut ingester_state = ingester.state.write().await; + let mut state_guard = ingester.state.write().await; ingester - .init_primary_shard(&mut ingester_state, &queue_id_00, &self_node_id, None) + .init_primary_shard(&mut state_guard, &queue_id_01, &self_node_id, None) .await .unwrap(); ingester - .init_primary_shard(&mut ingester_state, &queue_id_01, &self_node_id, None) + .init_primary_shard(&mut state_guard, &queue_id_02, &self_node_id, None) .await .unwrap(); - drop(ingester_state); + state_guard + .primary_shards + .get_mut(&queue_id_02) + .unwrap() + .shard_state = ShardState::Closed; + + drop(state_guard); let mut mrecordlog_guard = ingester.mrecordlog.write().await; @@ -1389,7 +1398,7 @@ mod tests { ] .into_iter(); mrecordlog_guard - .append_records(&queue_id_00, None, records) + .append_records(&queue_id_01, None, records) .await .unwrap(); @@ -1399,59 +1408,71 @@ mod tests { ] .into_iter(); mrecordlog_guard - .append_records(&queue_id_00, None, records) + .append_records(&queue_id_02, None, records) .await .unwrap(); drop(mrecordlog_guard); let truncate_request = TruncateRequest { - leader_id: self_node_id.to_string(), + ingester_id: self_node_id.to_string(), subrequests: vec![ TruncateSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), - shard_id: 0, + shard_id: 1, to_position_inclusive: 0, }, TruncateSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), - shard_id: 1, + shard_id: 2, to_position_inclusive: 1, }, TruncateSubrequest { index_uid: "test-index:1337".to_string(), source_id: "test-source".to_string(), - shard_id: 0, + shard_id: 1, to_position_inclusive: 1337, }, ], }; ingester.truncate(truncate_request).await.unwrap(); - let ingester_state = ingester.state.read().await; - ingester_state + let state_guard = ingester.state.read().await; + state_guard .primary_shards - .get(&queue_id_00) + .get(&queue_id_01) .unwrap() .assert_publish_position(0); - ingester_state + state_guard .primary_shards - .get(&queue_id_01) + .get(&queue_id_02) .unwrap() .assert_publish_position(1); + drop(state_guard); let mrecordlog_guard = ingester.mrecordlog.read().await; let (position, record) = mrecordlog_guard - .range(&queue_id_00, 0..) + .range(&queue_id_01, 0..) .unwrap() .next() .unwrap(); assert_eq!(position, 1); assert_eq!(&*record, b"test-doc-001"); - let record_opt = mrecordlog_guard.range(&queue_id_01, 0..).unwrap().next(); + let record_opt = mrecordlog_guard.range(&queue_id_02, 0..).unwrap().next(); assert!(record_opt.is_none()); + drop(mrecordlog_guard); + + // Wait for the removal task to run. + tokio::time::sleep(Duration::from_millis(100)).await; + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.primary_shards.len(), 1); + assert!(!state_guard.primary_shards.contains_key(&queue_id_02)); + + let mrecordlog_guard = ingester.mrecordlog.read().await; + assert!(!mrecordlog_guard.queue_exists(&queue_id_02)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 0ef85c19958..46583bfdca2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -131,7 +131,7 @@ pub(super) struct PrimaryShard { } impl PrimaryShard { - pub fn is_gc_candidate(&self) -> bool { + pub fn is_removable(&self) -> bool { self.shard_state.is_closed() && self.publish_position_inclusive >= self.primary_position_inclusive } @@ -178,7 +178,7 @@ impl PrimaryShard { /// Records the state of a replica shard managed by a follower. See [`PrimaryShard`] for more /// details about the fields. pub(super) struct ReplicaShard { - pub leader_id: NodeId, + pub _leader_id: NodeId, pub shard_state: ShardState, pub publish_position_inclusive: Position, pub replica_position_inclusive: Position, @@ -187,7 +187,7 @@ pub(super) struct ReplicaShard { } impl ReplicaShard { - pub fn is_gc_candidate(&self) -> bool { + pub fn is_removable(&self) -> bool { self.shard_state.is_closed() && self.publish_position_inclusive >= self.replica_position_inclusive } @@ -267,7 +267,7 @@ impl ReplicaShard { let (shard_status_tx, shard_status_rx) = watch::channel(shard_status); Self { - leader_id: leader_id.into(), + _leader_id: leader_id.into(), shard_state, publish_position_inclusive, replica_position_inclusive, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 68c5cfe64ad..235550d557b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -25,16 +25,13 @@ use mrecordlog::MultiRecordLog; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, ReplicateRequest, - ReplicateResponse, ReplicateSuccess, SynReplicationMessage, TruncateRequest, TruncateResponse, + ReplicateResponse, ReplicateSuccess, SynReplicationMessage, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::NodeId; -use quickwit_proto::QueueId; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task::JoinHandle; -use super::gc::{remove_shards_after, REMOVAL_GRACE_PERIOD}; -use super::ingest_metastore::IngestMetastore; use super::ingester::{commit_doc, IngesterState}; use super::models::{Position, ReplicaShard, ShardStatus}; use crate::metrics::INGEST_METRICS; @@ -44,27 +41,17 @@ use crate::metrics::INGEST_METRICS; #[derive(Debug)] pub(super) enum ReplicationRequest { Replicate(ReplicateRequest), - Truncate(TruncateRequest), } #[derive(Debug)] pub(super) enum ReplicationResponse { Replicate(ReplicateResponse), - Truncate(TruncateResponse), } impl ReplicationResponse { pub fn into_replicate_response(self) -> Option { match self { ReplicationResponse::Replicate(replicate_response) => Some(replicate_response), - ReplicationResponse::Truncate(_) => None, - } - } - - pub fn into_truncate_response(self) -> Option { - match self { - ReplicationResponse::Replicate(_) => None, - ReplicationResponse::Truncate(truncate_response) => Some(truncate_response), } } } @@ -97,25 +84,6 @@ impl ReplicationClient { .expect("TODO"); Ok(replicate_response) } - - /// Replicates a truncate request from a leader to its follower and waits for its response. - pub async fn truncate( - &self, - truncate_request: TruncateRequest, - ) -> IngestV2Result { - let replication_request = ReplicationRequest::Truncate(truncate_request); - let (replication_response_tx, replication_response_rx) = oneshot::channel(); - self.oneshot_replication_request_tx - .clone() - .send((replication_request, replication_response_tx)) - .expect("TODO"); - let truncate_response = replication_response_rx - .await - .expect("TODO") - .into_truncate_response() - .expect("TODO"); - Ok(truncate_response) - } } /// Processes [`ReplicateRequest`] requests sent by a leader. It queues requests and pipes them into @@ -164,9 +132,6 @@ impl ReplicationClientTask { ReplicationRequest::Replicate(replication_request) => { SynReplicationMessage::new_replicate_request(replication_request) } - ReplicationRequest::Truncate(truncate_request) => { - SynReplicationMessage::new_truncate_request(truncate_request) - } }; self.syn_replication_stream_tx .send(syn_replication_message) @@ -195,7 +160,6 @@ pub(super) struct ReplicationTaskHandle { pub(super) struct ReplicationTask { leader_id: NodeId, follower_id: NodeId, - metastore: Arc, mrecordlog: Arc>, state: Arc>, syn_replication_stream: ServiceStream, @@ -206,7 +170,6 @@ impl ReplicationTask { pub fn spawn( leader_id: NodeId, follower_id: NodeId, - metastore: Arc, mrecordlog: Arc>, state: Arc>, syn_replication_stream: ServiceStream, @@ -215,7 +178,6 @@ impl ReplicationTask { let mut replication_task = Self { leader_id, follower_id, - metastore, mrecordlog, state, syn_replication_stream, @@ -265,7 +227,7 @@ impl ReplicationTask { let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); ReplicaShard { - leader_id: replicate_request.leader_id.clone().into(), + _leader_id: replicate_request.leader_id.clone().into(), shard_state: ShardState::Open, publish_position_inclusive: Position::default(), replica_position_inclusive: Position::default(), @@ -359,43 +321,6 @@ impl ReplicationTask { Ok(replicate_response) } - async fn truncate( - &mut self, - truncate_request: TruncateRequest, - ) -> IngestV2Result { - let mut queues_to_remove: Vec = Vec::new(); - let mut mrecordlog_guard = self.mrecordlog.write().await; - let mut state_guard = self.state.write().await; - - for truncate_subrequest in truncate_request.subrequests { - let queue_id = truncate_subrequest.queue_id(); - - if let Some(replica_shard) = state_guard.replica_shards.get_mut(&queue_id) { - mrecordlog_guard - .truncate(&queue_id, truncate_subrequest.to_position_inclusive) - .await - .map_err(|error| { - IngestV2Error::Internal(format!("Failed to truncate: {error:?}")) - })?; - replica_shard - .set_publish_position_inclusive(truncate_subrequest.to_position_inclusive); - - if replica_shard.is_gc_candidate() { - queues_to_remove.push(queue_id); - } - } - } - remove_shards_after( - queues_to_remove, - REMOVAL_GRACE_PERIOD, - self.metastore.clone(), - self.mrecordlog.clone(), - self.state.clone(), - ); - let truncate_response = TruncateResponse {}; - Ok(truncate_response) - } - async fn run(&mut self) -> IngestV2Result<()> { while let Some(syn_replication_message) = self.syn_replication_stream.next().await { let ack_replication_message = match syn_replication_message.message { @@ -403,10 +328,6 @@ impl ReplicationTask { .replicate(replicate_request) .await .map(AckReplicationMessage::new_replicate_response), - Some(syn_replication_message::Message::TruncateRequest(truncate_request)) => self - .truncate(truncate_request) - .await - .map(AckReplicationMessage::new_truncate_response), _ => panic!("TODO"), }; if self @@ -427,9 +348,6 @@ fn into_replication_response(outer_message: AckReplicationMessage) -> Option { Some(ReplicationResponse::Replicate(replicate_response)) } - Some(ack_replication_message::Message::TruncateResponse(truncate_response)) => { - Some(ReplicationResponse::Truncate(truncate_response)) - } _ => None, } } @@ -445,7 +363,6 @@ mod tests { use quickwit_proto::types::queue_id; use super::*; - use crate::ingest_v2::ingest_metastore::MockIngestMetastore; use crate::ingest_v2::test_utils::{MultiRecordLogTestExt, ReplicaShardTestExt}; #[tokio::test] @@ -550,7 +467,6 @@ mod tests { let leader_id: NodeId = "test-leader".into(); let follower_id: NodeId = "test-follower".into(); let tempdir = tempfile::tempdir().unwrap(); - let metastore = Arc::new(MockIngestMetastore::default()); let mrecordlog = Arc::new(RwLock::new( MultiRecordLog::open(tempdir.path()).await.unwrap(), )); @@ -565,7 +481,6 @@ mod tests { let _replication_task_handle = ReplicationTask::spawn( leader_id, follower_id, - metastore, mrecordlog.clone(), state.clone(), syn_replication_stream, diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 3ea4c9cf923..836f0672a7e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -85,7 +85,6 @@ message SynReplicationMessage { oneof message { OpenReplicationStreamRequest open_request = 1; ReplicateRequest replicate_request = 2; - TruncateRequest truncate_request = 3; } } @@ -93,7 +92,6 @@ message AckReplicationMessage { oneof message { OpenReplicationStreamResponse open_response = 1; ReplicateResponse replicate_response = 3; - TruncateResponse truncate_response = 4; } } @@ -142,7 +140,7 @@ message ReplicateFailure { } message TruncateRequest { - string leader_id = 1; + string ingester_id = 1; repeated TruncateSubrequest subrequests = 2; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 804431bf47c..ef42722534e 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -65,7 +65,7 @@ pub struct PersistFailure { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SynReplicationMessage { - #[prost(oneof = "syn_replication_message::Message", tags = "1, 2, 3")] + #[prost(oneof = "syn_replication_message::Message", tags = "1, 2")] pub message: ::core::option::Option, } /// Nested message and enum types in `SynReplicationMessage`. @@ -79,15 +79,13 @@ pub mod syn_replication_message { OpenRequest(super::OpenReplicationStreamRequest), #[prost(message, tag = "2")] ReplicateRequest(super::ReplicateRequest), - #[prost(message, tag = "3")] - TruncateRequest(super::TruncateRequest), } } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AckReplicationMessage { - #[prost(oneof = "ack_replication_message::Message", tags = "1, 3, 4")] + #[prost(oneof = "ack_replication_message::Message", tags = "1, 3")] pub message: ::core::option::Option, } /// Nested message and enum types in `AckReplicationMessage`. @@ -101,8 +99,6 @@ pub mod ack_replication_message { OpenResponse(super::OpenReplicationStreamResponse), #[prost(message, tag = "3")] ReplicateResponse(super::ReplicateResponse), - #[prost(message, tag = "4")] - TruncateResponse(super::TruncateResponse), } } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] @@ -188,7 +184,7 @@ pub struct ReplicateFailure { #[derive(Clone, PartialEq, ::prost::Message)] pub struct TruncateRequest { #[prost(string, tag = "1")] - pub leader_id: ::prost::alloc::string::String, + pub ingester_id: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] pub subrequests: ::prost::alloc::vec::Vec, } diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index fde27fa38f2..3b4deb176d8 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -99,14 +99,6 @@ impl SynReplicationMessage { )), } } - - pub fn new_truncate_request(truncate_request: TruncateRequest) -> Self { - Self { - message: Some(syn_replication_message::Message::TruncateRequest( - truncate_request, - )), - } - } } impl AckReplicationMessage { @@ -134,14 +126,6 @@ impl AckReplicationMessage { )), } } - - pub fn new_truncate_response(truncate_response: TruncateResponse) -> Self { - Self { - message: Some(ack_replication_message::Message::TruncateResponse( - truncate_response, - )), - } - } } impl ReplicateSubrequest {