Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor service pool in control plane service #3622

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
futures = { workspace = true }
mockall = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
Expand Down
74 changes: 39 additions & 35 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use std::collections::HashMap;
use std::hash::Hash;

use itertools::Itertools;
use quickwit_cluster::ClusterMember;
use quickwit_common::rendezvous_hasher::sort_by_rendez_vous_hash;
use quickwit_config::{SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID};
use quickwit_proto::indexing_api::IndexingTask;
use quickwit_proto::IndexUid;
use serde::Serialize;

use crate::IndexerNodeInfo;

/// A [`PhysicalIndexingPlan`] defines the list of indexing tasks
/// each indexer, identified by its node ID, should run.
/// TODO(fmassot): a metastore version number will be attached to the plan
Expand Down Expand Up @@ -137,7 +138,7 @@ impl PhysicalIndexingPlan {
/// tasks. We can potentially use this info to assign an indexing task to a node running the same
/// task.
pub(crate) fn build_physical_indexing_plan(
indexers: &[ClusterMember],
indexers: &[(String, IndexerNodeInfo)],
source_configs: &HashMap<IndexSourceId, SourceConfig>,
mut indexing_tasks: Vec<IndexingTask>,
) -> PhysicalIndexingPlan {
Expand All @@ -147,7 +148,7 @@ pub(crate) fn build_physical_indexing_plan(
});
let mut node_ids = indexers
.iter()
.map(|indexer| indexer.node_id.to_string())
.map(|indexer| indexer.0.clone())
.collect_vec();

// Build the plan.
Expand Down Expand Up @@ -271,7 +272,7 @@ impl From<IndexingTask> for IndexSourceId {
/// - Ignore disabled sources, `CLI_INGEST_SOURCE_ID` and files sources (Quickwit is not aware of
/// the files locations and thus are ignored).
pub(crate) fn build_indexing_plan(
indexers: &[ClusterMember],
indexers: &[(String, IndexerNodeInfo)],
source_configs: &HashMap<IndexSourceId, SourceConfig>,
) -> Vec<IndexingTask> {
let mut indexing_tasks: Vec<IndexingTask> = Vec::new();
Expand Down Expand Up @@ -311,26 +312,27 @@ pub(crate) fn build_indexing_plan(

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::num::NonZeroUsize;

use itertools::Itertools;
use proptest::prelude::*;
use quickwit_cluster::ClusterMember;
use quickwit_common::rand::append_random_suffix;
use quickwit_config::service::QuickwitService;
use quickwit_config::{
FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use quickwit_indexing::indexing_client::create_indexing_service_client;
use quickwit_proto::indexing_api::IndexingTask;
use quickwit_proto::IndexUid;
use rand::seq::SliceRandom;
use serde_json::json;

use super::{build_physical_indexing_plan, IndexSourceId};
use crate::indexing_plan::build_indexing_plan;
use crate::IndexerNodeInfo;

fn kafka_source_params_for_test() -> SourceParams {
SourceParams::Kafka(KafkaSourceParams {
Expand All @@ -343,22 +345,21 @@ mod tests {
})
}

fn cluster_members_for_test(
async fn cluster_members_for_test(
num_members: usize,
quickwit_service: QuickwitService,
) -> Vec<ClusterMember> {
_quickwit_service: QuickwitService,
) -> Vec<(String, IndexerNodeInfo)> {
let mut members = Vec::new();
for idx in 0..num_members {
let addr: SocketAddr = ([127, 0, 0, 1], 10).into();
members.push(ClusterMember::new(
let client = create_indexing_service_client(addr).await.unwrap();
members.push((
(1 + idx).to_string(),
0.into(),
true,
HashSet::from_iter([quickwit_service].into_iter()),
addr,
addr,
Vec::new(),
))
IndexerNodeInfo {
client,
indexing_tasks: Vec::new(),
},
));
}
members
}
Expand All @@ -378,9 +379,9 @@ mod tests {
.sum()
}

#[test]
fn test_build_indexing_plan_one_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_one_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let index_source_id = IndexSourceId {
index_uid: "one-source-index:11111111111111111111111111"
Expand Down Expand Up @@ -415,9 +416,9 @@ mod tests {
}
}

#[test]
fn test_build_indexing_plan_with_ingest_api_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_with_ingest_api_source() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let index_source_id = IndexSourceId {
index_uid: "ingest-api-index:11111111111111111111111111"
Expand Down Expand Up @@ -452,9 +453,9 @@ mod tests {
}
}

#[test]
fn test_build_indexing_plan_with_sources_to_ignore() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer);
#[tokio::test]
async fn test_build_indexing_plan_with_sources_to_ignore() {
let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await;
let mut source_configs_map = HashMap::new();
let file_index_source_id = IndexSourceId {
index_uid: "one-source-index:11111111111111111111111111"
Expand Down Expand Up @@ -515,8 +516,8 @@ mod tests {
assert_eq!(indexing_tasks.len(), 0);
}

#[test]
fn test_build_physical_indexing_plan_simple() {
#[tokio::test]
async fn test_build_physical_indexing_plan_simple() {
quickwit_common::setup_logging_for_tests();
// Rdv hashing for (index 1, source) returns [node 2, node 1].
let index_1 = "1";
Expand Down Expand Up @@ -579,17 +580,17 @@ mod tests {
});
}

let indexers = cluster_members_for_test(2, QuickwitService::Indexer);
let indexers = cluster_members_for_test(2, QuickwitService::Indexer).await;
let physical_plan =
build_physical_indexing_plan(&indexers, &source_configs_map, indexing_tasks.clone());
assert_eq!(physical_plan.indexing_tasks_per_node_id.len(), 2);
let indexer_1_tasks = physical_plan
.indexing_tasks_per_node_id
.get(&indexers[0].node_id)
.get(&indexers[0].0)
.unwrap();
let indexer_2_tasks = physical_plan
.indexing_tasks_per_node_id
.get(&indexers[1].node_id)
.get(&indexers[1].0)
.unwrap();
// (index 1, source) tasks are first placed on indexer 2 by rdv hashing.
// Thus task 0 => indexer 2, task 1 => indexer 1, task 2 => indexer 2, task 3 => indexer 1.
Expand All @@ -611,8 +612,8 @@ mod tests {
assert_eq!(indexer_2_tasks, &expected_indexer_2_tasks);
}

#[test]
fn test_build_physical_indexing_plan_with_not_enough_indexers() {
#[tokio::test]
async fn test_build_physical_indexing_plan_with_not_enough_indexers() {
quickwit_common::setup_logging_for_tests();
let index_1 = "test-indexing-plan-1";
let source_1 = "source-1";
Expand Down Expand Up @@ -644,7 +645,7 @@ mod tests {
},
];

let indexers = cluster_members_for_test(1, QuickwitService::Indexer);
let indexers = cluster_members_for_test(1, QuickwitService::Indexer).await;
// This case should never happens but we just check that the plan building is resilient
// enough, it will ignore the tasks that cannot be allocated.
let physical_plan =
Expand All @@ -655,7 +656,10 @@ mod tests {
proptest! {
#[test]
fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) {
let mut indexers = cluster_members_for_test(num_indexers, QuickwitService::Indexer);
// proptest doesn't work with async
let mut indexers = tokio::runtime::Runtime::new().unwrap().block_on(
cluster_members_for_test(num_indexers, QuickwitService::Indexer)
);
let source_configs: HashMap<IndexSourceId, SourceConfig> = index_id_sources
.into_iter()
.map(|(index_uid, source_config)| {
Expand Down
23 changes: 16 additions & 7 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,26 @@ use std::sync::Arc;
use async_trait::async_trait;
pub use control_plane_service::*;
use quickwit_actors::{AskError, Mailbox, Universe};
use quickwit_cluster::Cluster;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::tower::Pool;
use quickwit_config::SourceParams;
use quickwit_grpc_clients::service_client_pool::ServiceClientPool;
use quickwit_indexing::indexing_client::IndexingServiceClient;
use quickwit_metastore::{Metastore, MetastoreEvent};
use quickwit_proto::indexing_api::IndexingTask;
use scheduler::IndexingScheduler;
use tracing::error;

pub type Result<T> = std::result::Result<T, ControlPlaneError>;

/// Indexer-node specific information stored in the pool of available indexer nodes
#[derive(Debug, Clone)]
pub struct IndexerNodeInfo {
pub client: IndexingServiceClient,
pub indexing_tasks: Vec<IndexingTask>,
}

pub type IndexerPool = Pool<String, IndexerNodeInfo>;

#[derive(Debug, Clone, thiserror::Error)]
pub enum ControlPlaneError {
#[error("An internal error occurred: {0}.")]
Expand Down Expand Up @@ -81,14 +91,13 @@ impl From<AskError<ControlPlaneError>> for ControlPlaneError {

/// Starts the Control Plane.
pub async fn start_control_plane_service(
cluster_id: String,
self_node_id: String,
universe: &Universe,
cluster: Cluster,
indexer_pool: IndexerPool,
metastore: Arc<dyn Metastore>,
) -> anyhow::Result<Mailbox<IndexingScheduler>> {
let ready_members_watcher = cluster.ready_members_watcher().await;
let indexing_service_client_pool =
ServiceClientPool::create_and_update_members(ready_members_watcher).await?;
let scheduler = IndexingScheduler::new(cluster, metastore, indexing_service_client_pool);
let scheduler = IndexingScheduler::new(cluster_id, self_node_id, metastore, indexer_pool);
let (scheduler_mailbox, _) = universe.spawn_builder().spawn(scheduler);
Ok(scheduler_mailbox)
}
Expand Down
Loading