Skip to content

Commit

Permalink
Issue/5474 merge pipeline finalization (#5475)
Browse files Browse the repository at this point in the history
* Better shutdown of merge pipelines.

Before this PR, when the last indexing pipeline
is shutdown, the merge pipeline associated to an index
would be abruptly shutdown too.

This PR makes two changes to this behavior.
First, we wait for ongoing or pending merges to
be executed before shutting down the pipeline.

Second, merge policy get the opportunity to offer
a list of extra merges to run.

This functionality is introduced to help users
who migrated from elasticsearch and rely on a daily
indexes.

Closes #5474

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

* Sending the right message in indexing service

---------

Co-authored-by: Adrien Guillo <[email protected]>
Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent b65f5df commit 47c1bf3
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 61 deletions.
19 changes: 19 additions & 0 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

fn is_zero(value: &usize) -> bool {
*value == 0
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -42,6 +46,15 @@ pub struct ConstWriteAmplificationMergePolicyConfig {
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
#[serde(default)]
#[serde(skip_serializing_if = "is_zero")]
pub max_finalize_merge_operations: usize,
/// Splits with a number of docs higher than
/// `max_finalize_split_num_docs` will not be considered
/// for finalize split merge operations.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_finalize_split_num_docs: Option<usize>,
}

impl Default for ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -51,6 +64,8 @@ impl Default for ConstWriteAmplificationMergePolicyConfig {
merge_factor: default_merge_factor(),
max_merge_factor: default_max_merge_factor(),
maturation_period: default_maturation_period(),
max_finalize_merge_operations: 0,
max_finalize_split_num_docs: None,
}
}
}
Expand Down Expand Up @@ -146,6 +161,10 @@ impl Default for MergePolicyConfig {
}

impl MergePolicyConfig {
pub fn noop() -> Self {
MergePolicyConfig::Nop
}

pub fn validate(&self) -> anyhow::Result<()> {
let (merge_factor, max_merge_factor) = match self {
MergePolicyConfig::Nop => {
Expand Down
33 changes: 20 additions & 13 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::{MergePlanner, MergeSchedulerService};
use crate::actors::merge_pipeline::FinishPendingMergesAndShutdownPipeline;
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
Expand Down Expand Up @@ -504,21 +505,27 @@ impl IndexingService {
.merge_pipeline_handles
.remove_entry(&merge_pipeline_to_shutdown)
{
// We kill the merge pipeline to avoid waiting a merge operation to finish as it can
// be long.
// We gracefully shutdown the merge pipeline, so we can complete the in-flight
// merges.
info!(
index_uid=%merge_pipeline_to_shutdown.index_uid,
source_id=%merge_pipeline_to_shutdown.source_id,
"no more indexing pipeline on this index and source, killing merge pipeline"
"shutting down orphan merge pipeline"
);
merge_pipeline_handle.handle.kill().await;
// The queue capacity of the merge pipeline is unbounded, so `.send_message(...)`
// should not block.
// We avoid using `.quit()` here because it waits for the actor to exit.
merge_pipeline_handle
.handle
.mailbox()
.send_message(FinishPendingMergesAndShutdownPipeline)
.await
.expect("merge pipeline mailbox should not be full");
}
}
// Finally remove the merge pipeline with an exit status.
// Finally, we remove the completed or failed merge pipelines.
self.merge_pipeline_handles
.retain(|_, merge_pipeline_mailbox_handle| {
merge_pipeline_mailbox_handle.handle.state().is_running()
});
.retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running());
self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len();
self.update_chitchat_running_plan().await;

Expand All @@ -543,23 +550,23 @@ impl IndexingService {
immature_splits_opt: Option<Vec<SplitMetadata>>,
ctx: &ActorContext<Self>,
) -> Result<Mailbox<MergePlanner>, IndexingError> {
if let Some(merge_pipeline_mailbox_handle) = self
if let Some(merge_pipeline_handle) = self
.merge_pipeline_handles
.get(&merge_pipeline_params.pipeline_id)
{
return Ok(merge_pipeline_mailbox_handle.mailbox.clone());
return Ok(merge_pipeline_handle.mailbox.clone());
}
let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone();
let merge_pipeline =
MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx());
let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone();
let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline);
let merge_pipeline_mailbox_handle = MergePipelineHandle {
let merge_pipeline_handle = MergePipelineHandle {
mailbox: merge_planner_mailbox.clone(),
handle: pipeline_handle,
};
self.merge_pipeline_handles
.insert(merge_pipeline_id, merge_pipeline_mailbox_handle);
.insert(merge_pipeline_id, merge_pipeline_handle);
self.counters.num_running_merge_pipelines += 1;
Ok(merge_planner_mailbox)
}
Expand Down Expand Up @@ -1190,7 +1197,7 @@ mod tests {

#[tokio::test]
async fn test_indexing_service_apply_plan() {
const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64;
const PARAMS_FINGERPRINT: u64 = 3865067856550546352;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
122 changes: 99 additions & 23 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

use super::MergeSchedulerService;
use super::publisher::DisconnectMergePlanner;
use super::{MergeSchedulerService, RunFinalizeMergePolicyAndQuit};
use crate::actors::indexing_pipeline::wait_duration_before_retry;
use crate::actors::merge_split_downloader::MergeSplitDownloader;
use crate::actors::publisher::PublisherType;
Expand All @@ -56,6 +57,22 @@ use crate::split_store::IndexingSplitStore;
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

/// Instructs the merge pipeline that it should stop itself.
/// Merges that have already been scheduled are not aborted.
///
/// In addition, the finalizer merge policy will be executed to schedule a few
/// additional merges.
///
/// After reception the `FinalizeAndClosePipeline`, the merge pipeline loop will
/// be disconnected. In other words, the connection from the merge publisher to
/// the merge planner will be cut, so that the merge pipeline will terminate naturally.
///
/// Supervisation will still exist. However it will not restart the pipeline
/// in case of failure, it will just kill all of the merge pipeline actors. (for
/// instance, if one of the actor is stuck).
#[derive(Debug, Clone, Copy)]
pub struct FinishPendingMergesAndShutdownPipeline;

struct MergePipelineHandles {
merge_planner: ActorHandle<MergePlanner>,
merge_split_downloader: ActorHandle<MergeSplitDownloader>,
Expand Down Expand Up @@ -96,6 +113,8 @@ pub struct MergePipeline {
kill_switch: KillSwitch,
/// Immature splits passed to the merge planner the first time the pipeline is spawned.
initial_immature_splits_opt: Option<Vec<SplitMetadata>>,
// After it is set to true, we don't respawn pipeline actors if they fail.
shutdown_initiated: bool,
}

#[async_trait]
Expand Down Expand Up @@ -141,6 +160,7 @@ impl MergePipeline {
merge_planner_inbox,
merge_planner_mailbox,
initial_immature_splits_opt,
shutdown_initiated: false,
}
}

Expand Down Expand Up @@ -251,7 +271,7 @@ impl MergePipeline {
Some(self.merge_planner_mailbox.clone()),
None,
);
let (merge_publisher_mailbox, merge_publisher_handler) = ctx
let (merge_publisher_mailbox, merge_publisher_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -271,15 +291,15 @@ impl MergePipeline {
self.params.max_concurrent_split_uploads,
self.params.event_broker.clone(),
);
let (merge_uploader_mailbox, merge_uploader_handler) = ctx
let (merge_uploader_mailbox, merge_uploader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_uploader);

// Merge Packager
let tag_fields = self.params.doc_mapper.tag_named_fields()?;
let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox);
let (merge_packager_mailbox, merge_packager_handler) = ctx
let (merge_packager_mailbox, merge_packager_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);
Expand All @@ -300,7 +320,7 @@ impl MergePipeline {
merge_executor_io_controls,
merge_packager_mailbox,
);
let (merge_executor_mailbox, merge_executor_handler) = ctx
let (merge_executor_mailbox, merge_executor_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -316,7 +336,7 @@ impl MergePipeline {
executor_mailbox: merge_executor_mailbox,
io_controls: split_downloader_io_controls,
};
let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx
let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -334,7 +354,7 @@ impl MergePipeline {
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
let (_, merge_planner_handler) = ctx
let (_, merge_planner_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_mailboxes(
Expand All @@ -346,27 +366,27 @@ impl MergePipeline {
self.previous_generations_statistics = self.statistics.clone();
self.statistics.generation += 1;
self.handles_opt = Some(MergePipelineHandles {
merge_planner: merge_planner_handler,
merge_split_downloader: merge_split_downloader_handler,
merge_executor: merge_executor_handler,
merge_packager: merge_packager_handler,
merge_uploader: merge_uploader_handler,
merge_publisher: merge_publisher_handler,
merge_planner: merge_planner_handle,
merge_split_downloader: merge_split_downloader_handle,
merge_executor: merge_executor_handle,
merge_packager: merge_packager_handle,
merge_uploader: merge_uploader_handle,
merge_publisher: merge_publisher_handle,
next_check_for_progress: Instant::now() + *HEARTBEAT,
});
Ok(())
}

async fn terminate(&mut self) {
self.kill_switch.kill();
if let Some(handlers) = self.handles_opt.take() {
if let Some(handles) = self.handles_opt.take() {
tokio::join!(
handlers.merge_planner.kill(),
handlers.merge_split_downloader.kill(),
handlers.merge_executor.kill(),
handlers.merge_packager.kill(),
handlers.merge_uploader.kill(),
handlers.merge_publisher.kill(),
handles.merge_planner.kill(),
handles.merge_split_downloader.kill(),
handles.merge_executor.kill(),
handles.merge_packager.kill(),
handles.merge_uploader.kill(),
handles.merge_publisher.kill(),
);
}
}
Expand Down Expand Up @@ -412,6 +432,7 @@ impl MergePipeline {
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 });
}
Health::Success => {
info!(index_uid=%self.params.pipeline_id.index_uid, "merge pipeline success, shutting down");
return Err(ActorExitStatus::Success);
}
}
Expand Down Expand Up @@ -467,6 +488,45 @@ impl Handler<SuperviseLoop> for MergePipeline {
}
}

#[async_trait]
impl Handler<FinishPendingMergesAndShutdownPipeline> for MergePipeline {
type Reply = ();
async fn handle(
&mut self,
_: FinishPendingMergesAndShutdownPipeline,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
info!(index_uid=%self.params.pipeline_id.index_uid, "shutdown merge pipeline initiated");
// From now on, we will not respawn the pipeline if it fails.
self.shutdown_initiated = true;
if let Some(handles) = &self.handles_opt {
// This disconnects the merge planner from the merge publisher,
// breaking the merge planner pipeline loop.
//
// As a result, the pipeline will naturally terminate
// once all of the pending / ongoing merge operations are completed.
let _ = handles
.merge_publisher
.mailbox()
.send_message(DisconnectMergePlanner)
.await;

// We also initiate the merge planner finalization routine.
// Depending on the merge policy, it may emit a few more merge
// operations.
let _ = handles
.merge_planner
.mailbox()
.send_message(RunFinalizeMergePolicyAndQuit)
.await;
} else {
// we won't respawn the pipeline in the future, so there is nothing
// to do here.
}
Ok(())
}
}

#[async_trait]
impl Handler<Spawn> for MergePipeline {
type Reply = ();
Expand All @@ -476,6 +536,9 @@ impl Handler<Spawn> for MergePipeline {
spawn: Spawn,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if self.shutdown_initiated {
return Ok(());
}
if self.handles_opt.is_some() {
return Ok(());
}
Expand Down Expand Up @@ -530,6 +593,7 @@ mod tests {
use quickwit_storage::RamStorage;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::actors::{MergePlanner, Publisher};
use crate::merge_policy::default_merge_policy;
use crate::IndexingSplitStore;

Expand Down Expand Up @@ -576,12 +640,24 @@ mod tests {
event_broker: Default::default(),
};
let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx());
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline);
let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.quit().await;
let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone();
let (pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline);
pipeline_mailbox
.ask(super::FinishPendingMergesAndShutdownPipeline)
.await
.unwrap();

let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await;
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1);
assert_eq!(pipeline_statistics.num_published_splits, 0);
assert!(matches!(pipeline_exit_status, ActorExitStatus::Quit));
assert!(matches!(pipeline_exit_status, ActorExitStatus::Success));

// Checking that the merge pipeline actors have been properly cleaned up.
assert!(universe.get_one::<MergePlanner>().is_none());
assert!(universe.get_one::<Publisher>().is_none());
assert!(universe.get_one::<MergePipeline>().is_none());

universe.assert_quit().await;
Ok(())
}
Expand Down
Loading

0 comments on commit 47c1bf3

Please sign in to comment.