Skip to content

Commit

Permalink
Fix race condition related to debouncing. (#4689)
Browse files Browse the repository at this point in the history
* Fix race condition related to deboucing.

In Ingest v1, queues are created on apply plans.
This means that right after creating an index, documents cannot be
inserted.

This was true before, but was dependent on a race condition. It
became more apparent after we added debouncing of control plane apply plans.

This PR adds code to make it possible for caller of debounced rebuild
plan code, to get notified and wait for a rebuild to be done and
applied.

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Mar 9, 2024
1 parent b290a10 commit 2c94074
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 38 deletions.
55 changes: 37 additions & 18 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use std::time::Duration;
use anyhow::Context;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{Future, StreamExt};
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe,
WeakMailbox,
Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox,
Supervisor, Universe, WeakMailbox,
};
use quickwit_cluster::{
ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode,
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Actor for ControlPlane {
.await
.context("failed to initialize the model")?;

self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);

self.ingest_controller.sync_with_all_ingesters(&self.model);

Expand Down Expand Up @@ -313,9 +313,17 @@ impl ControlPlane {
///
/// This method includes some debouncing logic. Every call will be followed by a cooldown
/// period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
///
/// This method returns a future that can be awaited to ensure that the relevant rebuild plan
/// operation has been executed.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) -> impl Future<Output = ()> {
let next_rebuild_waiter = self
.indexing_scheduler
.next_rebuild_tracker
.next_rebuild_waiter();
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlan>(ctx);
next_rebuild_waiter
}
}

Expand Down Expand Up @@ -370,7 +378,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
ctx.progress(),
)
.await?;
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(())
}
}
Expand Down Expand Up @@ -448,17 +456,21 @@ fn convert_metastore_error<T>(
// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl Handler<CreateIndexRequest> for ControlPlane {
impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<CreateIndexResponse>;

async fn handle(
async fn handle_message(
&mut self,
request: CreateIndexRequest,
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
) -> Result<(), ActorExitStatus> {
let response = match self.metastore.create_index(request).await {
Ok(response) => response,
Err(metastore_error) => return convert_metastore_error(metastore_error),
Err(metastore_error) => {
reply(convert_metastore_error(metastore_error)?);
return Ok(());
}
};
let index_metadata: IndexMetadata =
match serde_utils::from_json_str(&response.index_metadata_json) {
Expand All @@ -474,9 +486,16 @@ impl Handler<CreateIndexRequest> for ControlPlane {
self.model.add_index(index_metadata);

if should_rebuild_plan {
self.rebuild_plan_debounced(ctx);
let rebuild_plan_notifier = self.rebuild_plan_debounced(ctx);
tokio::task::spawn(async move {
rebuild_plan_notifier.await;
reply(Ok(response));
});
} else {
reply(Ok(response));
}
Ok(Ok(response))

Ok(())
}
}

Expand Down Expand Up @@ -510,7 +529,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -546,7 +565,7 @@ impl Handler<AddSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -574,7 +593,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
}
Expand Down Expand Up @@ -622,7 +641,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {

self.model.delete_source(&source_uid);

self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
let response = EmptyResponse {};

Ok(Ok(response))
Expand Down Expand Up @@ -658,7 +677,7 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
return Ok(Err(control_plane_error));
}
};
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
}
}
Expand Down Expand Up @@ -692,7 +711,7 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
self.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
self.rebuild_plan_debounced(ctx);
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::{Arc, Mutex};

/// This object makes it possible to track for the completion of the next rebuild.
pub struct RebuildNotifier {
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
generation_processed_rx: tokio::sync::watch::Receiver<usize>,
generation: usize,
}

impl Default for RebuildNotifier {
fn default() -> Self {
let (generation_processed_tx, generation_processed_rx) = tokio::sync::watch::channel(0);
RebuildNotifier {
generation_processed_tx: Arc::new(Mutex::new(generation_processed_tx)),
generation_processed_rx,
generation: 1,
}
}
}

impl RebuildNotifier {
/// Returns a future that resolves when the next rebuild is completed.
///
/// If an ongoing build T exists, it will not resolve upon build T's completion.
/// It will only be resolved upon build T+1's completion, or any subsequent build.
pub fn next_rebuild_waiter(&mut self) -> impl std::future::Future<Output = ()> {
let mut generation_processed_rx = self.generation_processed_rx.clone();
let current_generation = self.generation;
async move {
loop {
if *generation_processed_rx.borrow() >= current_generation {
return;
}
if generation_processed_rx.changed().await.is_err() {
return;
}
}
}
}

/// Starts a new rebuild.
pub fn start_rebuild(&mut self) -> Arc<NotifyChangeOnDrop> {
let generation = self.generation;
self.generation += 1;
Arc::new(NotifyChangeOnDrop {
generation,
generation_processed_tx: self.generation_processed_tx.clone(),
})
}
}

pub struct NotifyChangeOnDrop {
generation: usize,
generation_processed_tx: Arc<Mutex<tokio::sync::watch::Sender<usize>>>,
}

impl Drop for NotifyChangeOnDrop {
fn drop(&mut self) {
let generation_processed_tx = self.generation_processed_tx.lock().unwrap();
if self.generation < *generation_processed_tx.borrow() {
return;
}
let _ = generation_processed_tx.send(self.generation);
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

#[tokio::test]
async fn test_change_tracker() {
let mut change_tracker = RebuildNotifier::default();
let waiter = change_tracker.next_rebuild_waiter();
let change_notifier = change_tracker.start_rebuild();
drop(change_notifier);
waiter.await;
}

#[tokio::test]
async fn test_change_tracker_ongoing_is_not_good() {
let mut change_tracker = RebuildNotifier::default();
let change_notifier = change_tracker.start_rebuild();
let waiter = change_tracker.next_rebuild_waiter();
let waiter2 = change_tracker.next_rebuild_waiter();
drop(change_notifier);
let change_notifier2 = change_tracker.start_rebuild();
let timeout_res = tokio::time::timeout(Duration::from_millis(100), waiter).await;
assert!(timeout_res.is_err());
drop(change_notifier2);
waiter2.await;
}

#[tokio::test]
async fn test_change_tracker_all_waiters_are_notified() {
let mut change_tracker = RebuildNotifier::default();
let waiter = change_tracker.next_rebuild_waiter();
let waiter2 = change_tracker.next_rebuild_waiter();
let change_notifier = change_tracker.start_rebuild();
drop(change_notifier);
waiter.await;
waiter2.await;
}
}
14 changes: 12 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod change_tracker;
mod scheduling;

use std::cmp::Ordering;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
Expand All @@ -36,6 +38,7 @@ use serde::Serialize;
use tracing::{debug, info, warn};

use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::model::ControlPlaneModel;
use crate::{IndexerNodeInfo, IndexerPool};
Expand Down Expand Up @@ -100,6 +103,7 @@ pub struct IndexingScheduler {
self_node_id: NodeId,
indexer_pool: IndexerPool,
state: IndexingSchedulerState,
pub(crate) next_rebuild_tracker: RebuildNotifier,
}

impl fmt::Debug for IndexingScheduler {
Expand Down Expand Up @@ -187,6 +191,7 @@ impl IndexingScheduler {
self_node_id,
indexer_pool,
state: IndexingSchedulerState::default(),
next_rebuild_tracker: RebuildNotifier::default(),
}
}

Expand All @@ -202,6 +207,8 @@ impl IndexingScheduler {
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let notify_on_drop = self.next_rebuild_tracker.start_rebuild();

let sources = get_sources_to_schedule(model);

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
Expand Down Expand Up @@ -239,7 +246,7 @@ impl IndexingScheduler {
return;
}
}
self.apply_physical_indexing_plan(&indexers, new_physical_plan);
self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop));
self.state.num_schedule_indexing_plan += 1;
}

Expand Down Expand Up @@ -283,7 +290,7 @@ impl IndexingScheduler {
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone());
self.apply_physical_indexing_plan(&indexers, last_applied_plan.clone(), None);
}
}

Expand All @@ -295,12 +302,14 @@ impl IndexingScheduler {
&mut self,
indexers: &[IndexerNodeInfo],
new_physical_plan: PhysicalIndexingPlan,
notify_on_drop: Option<Arc<NotifyChangeOnDrop>>,
) {
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
// We don't want to block on a slow indexer so we apply this change asynchronously
// TODO not blocking is cool, but we need to make sure there is not accumulation
// possible here.
let notify_on_drop = notify_on_drop.clone();
tokio::spawn({
let indexer = indexers
.iter()
Expand All @@ -322,6 +331,7 @@ impl IndexingScheduler {
"failed to apply indexing plan to indexer"
);
}
drop(notify_on_drop);
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_actors::{
};
use quickwit_common::runtimes::RuntimeType;
use quickwit_common::tower::Cost;
use tracing::info;
use tracing::{error, info};
use ulid::Ulid;

use crate::metrics::INGEST_METRICS;
Expand Down Expand Up @@ -153,6 +153,7 @@ impl IngestApiService {
.find(|index_id| !self.queues.queue_exists(index_id));

if let Some(index_id) = first_non_existing_queue_opt {
error!(index_id=%index_id, "failed to find index");
return Err(IngestServiceError::IndexNotFound {
index_id: index_id.to_string(),
});
Expand Down
Loading

0 comments on commit 2c94074

Please sign in to comment.