Skip to content

Commit

Permalink
Refactor service pool in control plane service (#3622)
Browse files Browse the repository at this point in the history
Replace ServiceClientPool with Pool in control plane services.
  • Loading branch information
imotov authored Jul 12, 2023
1 parent 36cba7e commit eef4a72
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 138 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
futures = { workspace = true }
mockall = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
Expand Down
74 changes: 39 additions & 35 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use std::collections::HashMap;
use std::hash::Hash;

use itertools::Itertools;
use quickwit_cluster::ClusterMember;
use quickwit_common::rendezvous_hasher::sort_by_rendez_vous_hash;
use quickwit_config::{SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID};
use quickwit_proto::indexing_api::IndexingTask;
use quickwit_proto::IndexUid;
use serde::Serialize;

use crate::IndexerNodeInfo;

/// A [`PhysicalIndexingPlan`] defines the list of indexing tasks
/// each indexer, identified by its node ID, should run.
/// TODO(fmassot): a metastore version number will be attached to the plan
Expand Down Expand Up @@ -137,7 +138,7 @@ impl PhysicalIndexingPlan {
/// tasks. We can potentially use this info to assign an indexing task to a node running the same
/// task.
pub(crate) fn build_physical_indexing_plan(
indexers: &[ClusterMember],
indexers: &[(String, IndexerNodeInfo)],
source_configs: &HashMap<IndexSourceId, SourceConfig>,
mut indexing_tasks: Vec<IndexingTask>,
) -> PhysicalIndexingPlan {
Expand All @@ -147,7 +148,7 @@ pub(crate) fn build_physical_indexing_plan(
});
let mut node_ids = indexers
.iter()
.map(|indexer| indexer.node_id.to_string())
.map(|indexer| indexer.0.clone())
.collect_vec();

// Build the plan.
Expand Down Expand Up @@ -271,7 +272,7 @@ impl From<IndexingTask> for IndexSourceId {
/// - Ignore disabled sources, `CLI_INGEST_SOURCE_ID` and files sources (Quickwit is not aware of
/// the files locations and thus are ignored).
pub(crate) fn build_indexing_plan(
indexers: &[ClusterMember],
indexers: &[(String, IndexerNodeInfo)],
source_configs: &HashMap<IndexSourceId, SourceConfig>,
) -> Vec<IndexingTask> {
let mut indexing_tasks: Vec<IndexingTask> = Vec::new();
Expand Down Expand Up @@ -311,26 +312,27 @@ pub(crate) fn build_indexing_plan(

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::num::NonZeroUsize;

use itertools::Itertools;
use proptest::prelude::*;
use quickwit_cluster::ClusterMember;
use quickwit_common::rand::append_random_suffix;
use quickwit_config::service::QuickwitService;
use quickwit_config::{
FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use quickwit_indexing::indexing_client::create_indexing_service_client;
use quickwit_proto::indexing_api::IndexingTask;
use quickwit_proto::IndexUid;
use rand::seq::SliceRandom;
use serde_json::json;

use super::{build_physical_indexing_plan, IndexSourceId};
use crate::indexing_plan::build_indexing_plan;
use crate::IndexerNodeInfo;

fn kafka_source_params_for_test() -> SourceParams {
SourceParams::Kafka(KafkaSourceParams {
Expand All @@ -343,22 +345,21 @@ mod tests {
})
}

fn cluster_members_for_test(
async fn cluster_members_for_test(
num_members: usize,
quickwit_service: QuickwitService,
) -> Vec<ClusterMember> {
_quickwit_service: QuickwitService,
) -> Vec<(String, IndexerNodeInfo)> {
let mut members = Vec::new();
for idx in 0..num_members {
let addr: SocketAddr = ([127, 0, 0, 1], 10).into();
members.push(ClusterMember::new(
let client = create_indexing_service_client(addr).await.unwrap();
members.push((
(1 + idx).to_string(),
0.into(),
true,
HashSet::from_iter([quickwit_service].into_iter()),
addr,
addr,
Vec::new(),
))
IndexerNodeInfo {
client,
indexing_tasks: Vec::new(),
},
));
}
members
}
Expand All @@ -378,9 +379,9 @@ mod tests {
.sum()
}

#[test]
fn test_build_indexing_plan_one_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_one_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let index_source_id = IndexSourceId {
index_uid: "one-source-index:11111111111111111111111111"
Expand Down Expand Up @@ -415,9 +416,9 @@ mod tests {
}
}

#[test]
fn test_build_indexing_plan_with_ingest_api_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_with_ingest_api_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let index_source_id = IndexSourceId {
index_uid: "ingest-api-index:11111111111111111111111111"
Expand Down Expand Up @@ -452,9 +453,9 @@ mod tests {
}
}

#[test]
fn test_build_indexing_plan_with_sources_to_ignore() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_with_sources_to_ignore() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let file_index_source_id = IndexSourceId {
index_uid: "one-source-index:11111111111111111111111111"
Expand Down Expand Up @@ -515,8 +516,8 @@ mod tests {
assert_eq!(indexing_tasks.len(), 0);
}

#[test]
fn test_build_physical_indexing_plan_simple() {
#[tokio::test]
async fn test_build_physical_indexing_plan_simple() {
quickwit_common::setup_logging_for_tests();
// Rdv hashing for (index 1, source) returns [node 2, node 1].
let index_1 = "1";
Expand Down Expand Up @@ -579,17 +580,17 @@ mod tests {
});
}

let indexers = cluster_members_for_test(2, QuickwitService::Indexer);
let indexers = cluster_members_for_test(2, QuickwitService::Indexer).await;
let physical_plan =
build_physical_indexing_plan(&indexers, &source_configs_map, indexing_tasks.clone());
assert_eq!(physical_plan.indexing_tasks_per_node_id.len(), 2);
let indexer_1_tasks = physical_plan
.indexing_tasks_per_node_id
.get(&indexers[0].node_id)
.get(&indexers[0].0)
.unwrap();
let indexer_2_tasks = physical_plan
.indexing_tasks_per_node_id
.get(&indexers[1].node_id)
.get(&indexers[1].0)
.unwrap();
// (index 1, source) tasks are first placed on indexer 2 by rdv hashing.
// Thus task 0 => indexer 2, task 1 => indexer 1, task 2 => indexer 2, task 3 => indexer 1.
Expand All @@ -611,8 +612,8 @@ mod tests {
assert_eq!(indexer_2_tasks, &expected_indexer_2_tasks);
}

#[test]
fn test_build_physical_indexing_plan_with_not_enough_indexers() {
#[tokio::test]
async fn test_build_physical_indexing_plan_with_not_enough_indexers() {
quickwit_common::setup_logging_for_tests();
let index_1 = "test-indexing-plan-1";
let source_1 = "source-1";
Expand Down Expand Up @@ -644,7 +645,7 @@ mod tests {
},
];

let indexers = cluster_members_for_test(1, QuickwitService::Indexer);
let indexers = cluster_members_for_test(1, QuickwitService::Indexer).await;
// This case should never happens but we just check that the plan building is resilient
// enough, it will ignore the tasks that cannot be allocated.
let physical_plan =
Expand All @@ -655,7 +656,10 @@ mod tests {
proptest! {
#[test]
fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) {
let mut indexers = cluster_members_for_test(num_indexers, QuickwitService::Indexer);
// proptest doesn't work with async
let mut indexers = tokio::runtime::Runtime::new().unwrap().block_on(
cluster_members_for_test(num_indexers, QuickwitService::Indexer)
);
let source_configs: HashMap<IndexSourceId, SourceConfig> = index_id_sources
.into_iter()
.map(|(index_uid, source_config)| {
Expand Down
23 changes: 16 additions & 7 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,26 @@ use std::sync::Arc;
use async_trait::async_trait;
pub use control_plane_service::*;
use quickwit_actors::{AskError, Mailbox, Universe};
use quickwit_cluster::Cluster;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::tower::Pool;
use quickwit_config::SourceParams;
use quickwit_grpc_clients::service_client_pool::ServiceClientPool;
use quickwit_indexing::indexing_client::IndexingServiceClient;
use quickwit_metastore::{Metastore, MetastoreEvent};
use quickwit_proto::indexing_api::IndexingTask;
use scheduler::IndexingScheduler;
use tracing::error;

pub type Result<T> = std::result::Result<T, ControlPlaneError>;

/// Indexer-node specific information stored in the pool of available indexer nodes
#[derive(Debug, Clone)]
pub struct IndexerNodeInfo {
pub client: IndexingServiceClient,
pub indexing_tasks: Vec<IndexingTask>,
}

pub type IndexerPool = Pool<String, IndexerNodeInfo>;

#[derive(Debug, Clone, thiserror::Error)]
pub enum ControlPlaneError {
#[error("An internal error occurred: {0}.")]
Expand Down Expand Up @@ -81,14 +91,13 @@ impl From<AskError<ControlPlaneError>> for ControlPlaneError {

/// Starts the Control Plane.
pub async fn start_control_plane_service(
cluster_id: String,
self_node_id: String,
universe: &Universe,
cluster: Cluster,
indexer_pool: IndexerPool,
metastore: Arc<dyn Metastore>,
) -> anyhow::Result<Mailbox<IndexingScheduler>> {
let ready_members_watcher = cluster.ready_members_watcher().await;
let indexing_service_client_pool =
ServiceClientPool::create_and_update_members(ready_members_watcher).await?;
let scheduler = IndexingScheduler::new(cluster, metastore, indexing_service_client_pool);
let scheduler = IndexingScheduler::new(cluster_id, self_node_id, metastore, indexer_pool);
let (scheduler_mailbox, _) = universe.spawn_builder().spawn(scheduler);
Ok(scheduler_mailbox)
}
Expand Down
Loading

0 comments on commit eef4a72

Please sign in to comment.