From eef4a72178f360a599b79c876dfd187630f19bb2 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 12 Jul 2023 10:35:53 +0900 Subject: [PATCH] Refactor service pool in control plane service (#3622) Replace ServiceClientPool with Pool in control plane services. --- quickwit/Cargo.lock | 1 + quickwit/quickwit-control-plane/Cargo.toml | 1 + .../src/indexing_plan.rs | 74 +++++---- quickwit/quickwit-control-plane/src/lib.rs | 23 ++- .../quickwit-control-plane/src/scheduler.rs | 149 ++++++++++-------- quickwit/quickwit-serve/src/lib.rs | 124 +++++++++++---- 6 files changed, 234 insertions(+), 138 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 02ca64566c8..7610f5ea733 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4569,6 +4569,7 @@ dependencies = [ "async-trait", "chitchat", "dyn-clone", + "futures", "http", "hyper", "itertools", diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 97b20ffc4e0..63908a8f5fc 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -40,6 +40,7 @@ quickwit-metastore = { workspace = true } quickwit-proto = { workspace = true } [dev-dependencies] +futures = { workspace = true } mockall = { workspace = true } proptest = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index c9b23b4d675..47371085857 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -22,13 +22,14 @@ use std::collections::HashMap; use std::hash::Hash; use itertools::Itertools; -use quickwit_cluster::ClusterMember; use quickwit_common::rendezvous_hasher::sort_by_rendez_vous_hash; use quickwit_config::{SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID}; use quickwit_proto::indexing_api::IndexingTask; use quickwit_proto::IndexUid; use serde::Serialize; +use crate::IndexerNodeInfo; + /// A [`PhysicalIndexingPlan`] defines the list of indexing tasks /// each indexer, identified by its node ID, should run. /// TODO(fmassot): a metastore version number will be attached to the plan @@ -137,7 +138,7 @@ impl PhysicalIndexingPlan { /// tasks. We can potentially use this info to assign an indexing task to a node running the same /// task. pub(crate) fn build_physical_indexing_plan( - indexers: &[ClusterMember], + indexers: &[(String, IndexerNodeInfo)], source_configs: &HashMap, mut indexing_tasks: Vec, ) -> PhysicalIndexingPlan { @@ -147,7 +148,7 @@ pub(crate) fn build_physical_indexing_plan( }); let mut node_ids = indexers .iter() - .map(|indexer| indexer.node_id.to_string()) + .map(|indexer| indexer.0.clone()) .collect_vec(); // Build the plan. @@ -271,7 +272,7 @@ impl From for IndexSourceId { /// - Ignore disabled sources, `CLI_INGEST_SOURCE_ID` and files sources (Quickwit is not aware of /// the files locations and thus are ignored). pub(crate) fn build_indexing_plan( - indexers: &[ClusterMember], + indexers: &[(String, IndexerNodeInfo)], source_configs: &HashMap, ) -> Vec { let mut indexing_tasks: Vec = Vec::new(); @@ -311,19 +312,19 @@ pub(crate) fn build_indexing_plan( #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; use std::net::SocketAddr; use std::num::NonZeroUsize; use itertools::Itertools; use proptest::prelude::*; - use quickwit_cluster::ClusterMember; use quickwit_common::rand::append_random_suffix; use quickwit_config::service::QuickwitService; use quickwit_config::{ FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, }; + use quickwit_indexing::indexing_client::create_indexing_service_client; use quickwit_proto::indexing_api::IndexingTask; use quickwit_proto::IndexUid; use rand::seq::SliceRandom; @@ -331,6 +332,7 @@ mod tests { use super::{build_physical_indexing_plan, IndexSourceId}; use crate::indexing_plan::build_indexing_plan; + use crate::IndexerNodeInfo; fn kafka_source_params_for_test() -> SourceParams { SourceParams::Kafka(KafkaSourceParams { @@ -343,22 +345,21 @@ mod tests { }) } - fn cluster_members_for_test( + async fn cluster_members_for_test( num_members: usize, - quickwit_service: QuickwitService, - ) -> Vec { + _quickwit_service: QuickwitService, + ) -> Vec<(String, IndexerNodeInfo)> { let mut members = Vec::new(); for idx in 0..num_members { let addr: SocketAddr = ([127, 0, 0, 1], 10).into(); - members.push(ClusterMember::new( + let client = create_indexing_service_client(addr).await.unwrap(); + members.push(( (1 + idx).to_string(), - 0.into(), - true, - HashSet::from_iter([quickwit_service].into_iter()), - addr, - addr, - Vec::new(), - )) + IndexerNodeInfo { + client, + indexing_tasks: Vec::new(), + }, + )); } members } @@ -378,9 +379,9 @@ mod tests { .sum() } - #[test] - fn test_build_indexing_plan_one_source() { - let indexers = cluster_members_for_test(4, QuickwitService::Indexer); + #[tokio::test] + async fn test_build_indexing_plan_one_source() { + let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; let mut source_configs_map = HashMap::new(); let index_source_id = IndexSourceId { index_uid: "one-source-index:11111111111111111111111111" @@ -415,9 +416,9 @@ mod tests { } } - #[test] - fn test_build_indexing_plan_with_ingest_api_source() { - let indexers = cluster_members_for_test(4, QuickwitService::Indexer); + #[tokio::test] + async fn test_build_indexing_plan_with_ingest_api_source() { + let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; let mut source_configs_map = HashMap::new(); let index_source_id = IndexSourceId { index_uid: "ingest-api-index:11111111111111111111111111" @@ -452,9 +453,9 @@ mod tests { } } - #[test] - fn test_build_indexing_plan_with_sources_to_ignore() { - let indexers = cluster_members_for_test(4, QuickwitService::Indexer); + #[tokio::test] + async fn test_build_indexing_plan_with_sources_to_ignore() { + let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; let mut source_configs_map = HashMap::new(); let file_index_source_id = IndexSourceId { index_uid: "one-source-index:11111111111111111111111111" @@ -515,8 +516,8 @@ mod tests { assert_eq!(indexing_tasks.len(), 0); } - #[test] - fn test_build_physical_indexing_plan_simple() { + #[tokio::test] + async fn test_build_physical_indexing_plan_simple() { quickwit_common::setup_logging_for_tests(); // Rdv hashing for (index 1, source) returns [node 2, node 1]. let index_1 = "1"; @@ -579,17 +580,17 @@ mod tests { }); } - let indexers = cluster_members_for_test(2, QuickwitService::Indexer); + let indexers = cluster_members_for_test(2, QuickwitService::Indexer).await; let physical_plan = build_physical_indexing_plan(&indexers, &source_configs_map, indexing_tasks.clone()); assert_eq!(physical_plan.indexing_tasks_per_node_id.len(), 2); let indexer_1_tasks = physical_plan .indexing_tasks_per_node_id - .get(&indexers[0].node_id) + .get(&indexers[0].0) .unwrap(); let indexer_2_tasks = physical_plan .indexing_tasks_per_node_id - .get(&indexers[1].node_id) + .get(&indexers[1].0) .unwrap(); // (index 1, source) tasks are first placed on indexer 2 by rdv hashing. // Thus task 0 => indexer 2, task 1 => indexer 1, task 2 => indexer 2, task 3 => indexer 1. @@ -611,8 +612,8 @@ mod tests { assert_eq!(indexer_2_tasks, &expected_indexer_2_tasks); } - #[test] - fn test_build_physical_indexing_plan_with_not_enough_indexers() { + #[tokio::test] + async fn test_build_physical_indexing_plan_with_not_enough_indexers() { quickwit_common::setup_logging_for_tests(); let index_1 = "test-indexing-plan-1"; let source_1 = "source-1"; @@ -644,7 +645,7 @@ mod tests { }, ]; - let indexers = cluster_members_for_test(1, QuickwitService::Indexer); + let indexers = cluster_members_for_test(1, QuickwitService::Indexer).await; // This case should never happens but we just check that the plan building is resilient // enough, it will ignore the tasks that cannot be allocated. let physical_plan = @@ -655,7 +656,10 @@ mod tests { proptest! { #[test] fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) { - let mut indexers = cluster_members_for_test(num_indexers, QuickwitService::Indexer); + // proptest doesn't work with async + let mut indexers = tokio::runtime::Runtime::new().unwrap().block_on( + cluster_members_for_test(num_indexers, QuickwitService::Indexer) + ); let source_configs: HashMap = index_id_sources .into_iter() .map(|(index_uid, source_config)| { diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index 51f704ec455..6dc9d6c05a2 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -27,16 +27,26 @@ use std::sync::Arc; use async_trait::async_trait; pub use control_plane_service::*; use quickwit_actors::{AskError, Mailbox, Universe}; -use quickwit_cluster::Cluster; use quickwit_common::pubsub::EventSubscriber; +use quickwit_common::tower::Pool; use quickwit_config::SourceParams; -use quickwit_grpc_clients::service_client_pool::ServiceClientPool; +use quickwit_indexing::indexing_client::IndexingServiceClient; use quickwit_metastore::{Metastore, MetastoreEvent}; +use quickwit_proto::indexing_api::IndexingTask; use scheduler::IndexingScheduler; use tracing::error; pub type Result = std::result::Result; +/// Indexer-node specific information stored in the pool of available indexer nodes +#[derive(Debug, Clone)] +pub struct IndexerNodeInfo { + pub client: IndexingServiceClient, + pub indexing_tasks: Vec, +} + +pub type IndexerPool = Pool; + #[derive(Debug, Clone, thiserror::Error)] pub enum ControlPlaneError { #[error("An internal error occurred: {0}.")] @@ -81,14 +91,13 @@ impl From> for ControlPlaneError { /// Starts the Control Plane. pub async fn start_control_plane_service( + cluster_id: String, + self_node_id: String, universe: &Universe, - cluster: Cluster, + indexer_pool: IndexerPool, metastore: Arc, ) -> anyhow::Result> { - let ready_members_watcher = cluster.ready_members_watcher().await; - let indexing_service_client_pool = - ServiceClientPool::create_and_update_members(ready_members_watcher).await?; - let scheduler = IndexingScheduler::new(cluster, metastore, indexing_service_client_pool); + let scheduler = IndexingScheduler::new(cluster_id, self_node_id, metastore, indexer_pool); let (scheduler_mailbox, _) = universe.spawn_builder().spawn(scheduler); Ok(scheduler_mailbox) } diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs index 14476c35736..765ec487187 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/scheduler.rs @@ -27,11 +27,7 @@ use anyhow::Context; use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; -use quickwit_cluster::{Cluster, ClusterMember}; -use quickwit_config::service::QuickwitService; use quickwit_config::SourceConfig; -use quickwit_grpc_clients::service_client_pool::ServiceClientPool; -use quickwit_indexing::indexing_client::IndexingServiceClient; use quickwit_metastore::Metastore; use quickwit_proto::indexing_api::{ApplyIndexingPlanRequest, IndexingTask}; use serde::Serialize; @@ -40,7 +36,7 @@ use tracing::{debug, error, info, warn}; use crate::indexing_plan::{ build_indexing_plan, build_physical_indexing_plan, IndexSourceId, PhysicalIndexingPlan, }; -use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse}; +use crate::{IndexerNodeInfo, IndexerPool, NotifyIndexChangeRequest, NotifyIndexChangeResponse}; /// Interval between two controls (or checks) of the desired plan VS running plan. const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { @@ -109,17 +105,18 @@ pub struct IndexingSchedulerState { /// plase will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired /// plan with the running plan. pub struct IndexingScheduler { - cluster: Cluster, + cluster_id: String, + self_node_id: String, metastore: Arc, - indexing_client_pool: ServiceClientPool, + indexing_client_pool: IndexerPool, state: IndexingSchedulerState, } impl fmt::Debug for IndexingScheduler { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("IndexingScheduler") - .field("cluster_id", &self.cluster.cluster_id()) - .field("node_id", &self.cluster.self_node_id()) + .field("cluster_id", &self.cluster_id) + .field("node_id", &self.self_node_id) .field("metastore_uri", &self.metastore.uri()) .field( "last_applied_plan_ts", @@ -151,12 +148,14 @@ impl Actor for IndexingScheduler { impl IndexingScheduler { pub fn new( - cluster: Cluster, + cluster_id: String, + self_node_id: String, metastore: Arc, - indexing_client_pool: ServiceClientPool, + indexing_client_pool: IndexerPool, ) -> Self { Self { - cluster, + cluster_id, + self_node_id, metastore, indexing_client_pool, state: IndexingSchedulerState::default(), @@ -164,7 +163,7 @@ impl IndexingScheduler { } async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> { - let indexers: Vec = self.get_indexers_from_cluster_state().await; + let mut indexers = self.get_indexers_from_indexer_pool().await; if indexers.is_empty() { warn!("No indexer available, cannot schedule an indexing plan."); return Ok(()); @@ -184,7 +183,7 @@ impl IndexingScheduler { return Ok(()); } } - self.apply_physical_indexing_plan(&indexers, new_physical_plan) + self.apply_physical_indexing_plan(&mut indexers, new_physical_plan) .await; self.state.num_schedule_indexing_plan += 1; Ok(()) @@ -236,15 +235,10 @@ impl IndexingScheduler { } } - let indexers = self.get_indexers_from_cluster_state().await; + let mut indexers = self.get_indexers_from_indexer_pool().await; let running_indexing_tasks_by_node_id: HashMap> = indexers .iter() - .map(|cluster_member| { - ( - cluster_member.node_id.clone(), - cluster_member.indexing_tasks.clone(), - ) - }) + .map(|indexer| (indexer.0.clone(), indexer.1.indexing_tasks.clone())) .collect(); let indexing_plans_diff = get_indexing_plans_diff( @@ -257,48 +251,36 @@ impl IndexingScheduler { } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "Running tasks and last applied tasks differ: reapply last plan."); - self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone()) + self.apply_physical_indexing_plan(&mut indexers, last_applied_plan.clone()) .await; } Ok(()) } - async fn get_indexers_from_cluster_state(&self) -> Vec { - self.cluster - .ready_members() - .await - .into_iter() - .filter(|member| member.enabled_services.contains(&QuickwitService::Indexer)) - .collect() + async fn get_indexers_from_indexer_pool(&self) -> Vec<(String, IndexerNodeInfo)> { + self.indexing_client_pool.all().await } async fn apply_physical_indexing_plan( &mut self, - indexers: &[ClusterMember], + indexers: &mut [(String, IndexerNodeInfo)], new_physical_plan: PhysicalIndexingPlan, ) { debug!("Apply physical indexing plan: {:?}", new_physical_plan); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_node() { let indexer = indexers - .iter() - .find(|indexer| &indexer.node_id == node_id) + .iter_mut() + .find(|indexer| &indexer.0 == node_id) .expect("This should never happen as the plan was built from these indexers."); - match self.indexing_client_pool.get(indexer.grpc_advertise_addr) { - Some(mut indexing_client) => { - if let Err(error) = indexing_client - .apply_indexing_plan(ApplyIndexingPlanRequest { - indexing_tasks: indexing_tasks.clone(), - }) - .await - { - error!(indexer_node_id=%indexer.node_id, err=?error, "Error occurred when appling indexing plan to indexer."); - } - } - None => { - error!(indexer_node_id=%indexer.node_id, - "Indexing service client not found in pool for indexer, it should never happened, skip indexing plan.", - ); - } + if let Err(error) = indexer + .1 + .client + .apply_indexing_plan(ApplyIndexingPlanRequest { + indexing_tasks: indexing_tasks.clone(), + }) + .await + { + error!(indexer_node_id=%indexer.0, err=?error, "Error occurred when appling indexing plan to indexer."); } } self.state.num_applied_physical_indexing_plan += 1; @@ -533,12 +515,13 @@ mod tests { use std::time::Duration; use chitchat::transport::ChannelTransport; - use quickwit_actors::{ActorHandle, Inbox, Universe}; - use quickwit_cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test, Cluster}; + use futures::{Stream, StreamExt}; + use quickwit_actors::{ActorHandle, Inbox, Mailbox, Universe}; + use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; use quickwit_common::test_utils::wait_until_predicate; + use quickwit_common::tower::{Change, Pool}; use quickwit_config::service::QuickwitService; use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_grpc_clients::service_client_pool::ServiceClientPool; use quickwit_indexing::indexing_client::IndexingServiceClient; use quickwit_indexing::IndexingService; use quickwit_metastore::{IndexMetadata, MockMetastore}; @@ -549,6 +532,7 @@ mod tests { use crate::scheduler::{ get_indexing_plans_diff, MIN_DURATION_BETWEEN_SCHEDULING, REFRESH_PLAN_LOOP_INTERVAL, }; + use crate::IndexerNodeInfo; fn index_metadata_for_test( index_id: &str, @@ -580,6 +564,37 @@ mod tests { index_metadata } + pub fn test_indexer_change_stream( + cluster_change_stream: impl Stream + Send + 'static, + indexing_clients: HashMap>, + ) -> impl Stream> + Send + 'static { + cluster_change_stream.filter_map(move |cluster_change| { + let indexing_clients = indexing_clients.clone(); + Box::pin(async move { + match cluster_change { + ClusterChange::Add(node) + if node.enabled_services().contains(&QuickwitService::Indexer) => + { + let node_id = node.node_id().to_string(); + let grpc_addr = node.grpc_advertise_addr(); + let indexing_tasks = node.indexing_tasks().to_vec(); + let client_mailbox = indexing_clients.get(&node_id).unwrap().clone(); + let client = IndexingServiceClient::from_service(client_mailbox, grpc_addr); + Some(Change::Insert( + node_id, + IndexerNodeInfo { + client, + indexing_tasks, + }, + )) + } + ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), + _ => None, + } + }) + }) + } + async fn start_scheduler( cluster: Cluster, indexers: &[&Cluster], @@ -597,19 +612,23 @@ mod tests { .expect_list_indexes_metadatas() .returning(move || Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()])); let mut indexer_inboxes = Vec::new(); - let mut indexing_clients = Vec::new(); + let indexing_client_pool = Pool::default(); + let change_stream = cluster.ready_nodes_change_stream().await; + let mut indexing_clients = HashMap::new(); for indexer in indexers { let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); - let client_grpc_addr = - grpc_addr_from_listen_addr_for_test(indexer.gossip_listen_addr()); - let indexing_client = - IndexingServiceClient::from_service(indexing_service_mailbox, client_grpc_addr); - indexing_clients.push(indexing_client); + indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); indexer_inboxes.push(indexing_service_inbox); } - let indexing_client_pool = ServiceClientPool::for_clients_list(indexing_clients); - let indexing_scheduler = - IndexingScheduler::new(cluster, Arc::new(metastore), indexing_client_pool); + let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); + indexing_client_pool.listen_for_changes(indexer_change_stream); + + let indexing_scheduler = IndexingScheduler::new( + cluster.cluster_id().to_string(), + cluster.self_node_id().to_string(), + Arc::new(metastore), + indexing_client_pool, + ); let (_, scheduler_handler) = universe.spawn_builder().spawn(indexing_scheduler); (indexer_inboxes, scheduler_handler) } @@ -698,29 +717,23 @@ mod tests { .unwrap(); let universe = Universe::with_accelerated_time(); let (indexing_service_inboxes, scheduler_handler) = - start_scheduler(cluster.clone(), &[&cluster.clone()], &universe).await; - let indexing_service_inbox = indexing_service_inboxes[0].clone(); + start_scheduler(cluster.clone(), &[], &universe).await; + assert_eq!(indexing_service_inboxes.len(), 0); // No indexer. universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await; let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); assert!(scheduler_state.last_applied_physical_plan.is_none()); - assert_eq!(indexing_service_inbox_messages.len(), 0); // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no // scheduling. universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); assert!(scheduler_state.last_applied_physical_plan.is_none()); - assert_eq!(indexing_service_inbox_messages.len(), 0); universe.assert_quit().await; } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 30c151620c4..9e7b3cf396a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -62,9 +62,13 @@ use quickwit_common::tower::{ }; use quickwit_config::service::QuickwitService; use quickwit_config::{QuickwitConfig, SearcherConfig}; -use quickwit_control_plane::{start_control_plane_service, ControlPlaneServiceClient}; +use quickwit_control_plane::scheduler::IndexingScheduler; +use quickwit_control_plane::{ + start_control_plane_service, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool, +}; use quickwit_core::{IndexService, IndexServiceError}; use quickwit_indexing::actors::IndexingService; +use quickwit_indexing::indexing_client::IndexingServiceClient; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ start_ingest_api_service, GetMemoryCapacity, IngestRequest, IngestServiceClient, MemoryCapacity, @@ -203,33 +207,6 @@ pub async fn serve_quickwit( storage_resolver.clone(), )); - // Instantiate the control plane service if enabled. - // If not and metastore service is enabled, we need to instantiate the control plane client - // so the metastore can notify the control plane. - let control_plane_service: Option = if config - .enabled_services - .contains(&QuickwitService::ControlPlane) - { - let control_plane_mailbox = - start_control_plane_service(&universe, cluster.clone(), metastore.clone()).await?; - Some(ControlPlaneServiceClient::from_mailbox( - control_plane_mailbox, - )) - } else if config - .enabled_services - .contains(&QuickwitService::Metastore) - { - let balance_channel = - balance_channel_for_service(&cluster, QuickwitService::ControlPlane).await; - Some(ControlPlaneServiceClient::from_channel(balance_channel)) - } else { - None - }; - let control_plane_subscription_handle = - control_plane_service.as_ref().map(|scheduler_service| { - event_broker.subscribe::(scheduler_service.clone()) - }); - let (ingest_service, indexing_service) = if config .enabled_services .contains(&QuickwitService::Indexer) @@ -289,6 +266,41 @@ pub async fn serve_quickwit( (ingest_service, None) }; + // Instantiate the control plane service if enabled. + // If not and metastore service is enabled, we need to instantiate the control plane client + // so the metastore can notify the control plane. + let control_plane_service: Option = if config + .enabled_services + .contains(&QuickwitService::ControlPlane) + { + let cluster_change_stream = cluster.ready_nodes_change_stream().await; + let control_plane_mailbox = setup_control_plane_service( + &universe, + cluster.cluster_id().to_string(), + cluster.self_node_id().to_string(), + cluster_change_stream, + indexing_service.clone(), + metastore.clone(), + ) + .await?; + Some(ControlPlaneServiceClient::from_mailbox( + control_plane_mailbox, + )) + } else if config + .enabled_services + .contains(&QuickwitService::Metastore) + { + let balance_channel = + balance_channel_for_service(&cluster, QuickwitService::ControlPlane).await; + Some(ControlPlaneServiceClient::from_channel(balance_channel)) + } else { + None + }; + let control_plane_subscription_handle = + control_plane_service.as_ref().map(|scheduler_service| { + event_broker.subscribe::(scheduler_service.clone()) + }); + let searcher_config = config.searcher_config.clone(); let cluster_change_stream = cluster.ready_nodes_change_stream().await; @@ -510,6 +522,62 @@ async fn setup_searcher( Ok((search_job_placer, search_service)) } +async fn setup_control_plane_service( + universe: &Universe, + cluster_id: String, + self_node_id: String, + cluster_change_stream: impl Stream + Send + 'static, + indexing_service: Option>, + metastore: Arc, +) -> anyhow::Result> { + let indexer_pool = IndexerPool::default(); + let indexing_scheduler = start_control_plane_service( + cluster_id, + self_node_id, + universe, + indexer_pool.clone(), + metastore, + ) + .await?; + let indexer_change_stream = cluster_change_stream.filter_map(move |cluster_change| { + let indexing_service_clone = indexing_service.clone(); + Box::pin(async move { + match cluster_change { + ClusterChange::Add(node) + if node.enabled_services().contains(&QuickwitService::Indexer) => + { + let node_id = node.node_id().to_string(); + let grpc_addr = node.grpc_advertise_addr(); + let indexing_tasks = node.indexing_tasks().to_vec(); + + if node.is_self_node() { + if let Some(indexing_service_clone) = indexing_service_clone { + let client = + IndexingServiceClient::from_service(indexing_service_clone, grpc_addr); + Some(Change::Insert(node_id, IndexerNodeInfo { client, indexing_tasks })) + } else { + // That means that cluster thinks we are supposed to have an indexer, but we actually don't. + None + } + } else { + let grpc_client = + quickwit_proto::indexing_api::indexing_service_client::IndexingServiceClient::new( + node.channel(), + ); + let client = + IndexingServiceClient::from_grpc_client(grpc_client, grpc_addr); + Some(Change::Insert(node_id, IndexerNodeInfo { client, indexing_tasks })) + } + } + ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), + _ => None, + } + }) + }); + indexer_pool.listen_for_changes(indexer_change_stream); + Ok(indexing_scheduler) +} + fn require( val_opt: Option, ) -> impl Filter + Clone {