Skip to content

Commit

Permalink
Separate IndexingScheduler from Control Plane
Browse files Browse the repository at this point in the history
Introduces a distinct Control Plane component, which is now separate
from the Indexing Scheduler.

See #3443
See #3622
  • Loading branch information
imotov committed Jul 13, 2023
1 parent eef4a72 commit 015733a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
71 changes: 71 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
use tracing::debug;

use crate::scheduler::IndexingScheduler;
use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};

#[derive(Debug)]
pub struct ControlPlane {
index_scheduler_mailbox: Mailbox<IndexingScheduler>,
}

#[async_trait]
impl Actor for ControlPlane {
type ObservableState = ();

fn observable_state(&self) -> Self::ObservableState {}

fn name(&self) -> String {
"ControlPlane".to_string()
}
}

impl ControlPlane {
pub fn new(index_scheduler_mailbox: Mailbox<IndexingScheduler>) -> Self {
Self {
index_scheduler_mailbox,
}
}
}

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for ControlPlane {
type Reply = crate::Result<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
request: NotifyIndexChangeRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
debug!("Index change notification: schedule indexing plan.");
self.index_scheduler_mailbox
.send_message(request)
.await
.context("Error sending index change notification to index scheduler.")?;
Ok(Ok(NotifyIndexChangeResponse {}))
}
}

#[cfg(test)]
mod tests {}
3 changes: 2 additions & 1 deletion quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

pub mod control_plane;
#[path = "codegen/control_plane_service.rs"]
mod control_plane_service;
pub mod indexing_plan;
Expand Down Expand Up @@ -90,7 +91,7 @@ impl From<AskError<ControlPlaneError>> for ControlPlaneError {
}

/// Starts the Control Plane.
pub async fn start_control_plane_service(
pub async fn start_indexing_scheduler(
cluster_id: String,
self_node_id: String,
universe: &Universe,
Expand Down
31 changes: 27 additions & 4 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ use quickwit_common::tower::{
};
use quickwit_config::service::QuickwitService;
use quickwit_config::{QuickwitConfig, SearcherConfig};
use quickwit_control_plane::control_plane::ControlPlane;
use quickwit_control_plane::scheduler::IndexingScheduler;
use quickwit_control_plane::{
start_control_plane_service, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool,
start_indexing_scheduler, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool,
};
use quickwit_core::{IndexService, IndexServiceError};
use quickwit_indexing::actors::IndexingService;
Expand Down Expand Up @@ -274,7 +275,7 @@ pub async fn serve_quickwit(
.contains(&QuickwitService::ControlPlane)
{
let cluster_change_stream = cluster.ready_nodes_change_stream().await;
let control_plane_mailbox = setup_control_plane_service(
let control_plane_mailbox = start_control_plane(
&universe,
cluster.cluster_id().to_string(),
cluster.self_node_id().to_string(),
Expand Down Expand Up @@ -522,7 +523,29 @@ async fn setup_searcher(
Ok((search_job_placer, search_service))
}

async fn setup_control_plane_service(
async fn start_control_plane(
universe: &Universe,
cluster_id: String,
self_node_id: String,
cluster_change_stream: impl Stream<Item = ClusterChange> + Send + 'static,
indexing_service: Option<Mailbox<IndexingService>>,
metastore: Arc<dyn Metastore>,
) -> anyhow::Result<Mailbox<ControlPlane>> {
let scheduler = setup_indexer_scheduler(
universe,
cluster_id,
self_node_id,
cluster_change_stream,
indexing_service,
metastore,
)
.await?;
let control_plane = ControlPlane::new(scheduler);
let (control_plane_mailbox, _) = universe.spawn_builder().spawn(control_plane);
Ok(control_plane_mailbox)
}

async fn setup_indexer_scheduler(
universe: &Universe,
cluster_id: String,
self_node_id: String,
Expand All @@ -531,7 +554,7 @@ async fn setup_control_plane_service(
metastore: Arc<dyn Metastore>,
) -> anyhow::Result<Mailbox<IndexingScheduler>> {
let indexer_pool = IndexerPool::default();
let indexing_scheduler = start_control_plane_service(
let indexing_scheduler = start_indexing_scheduler(
cluster_id,
self_node_id,
universe,
Expand Down

0 comments on commit 015733a

Please sign in to comment.