Skip to content

Commit

Permalink
Add debounce to shard prune request
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 4, 2024
1 parent 43b202f commit b08930c
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 130 deletions.
49 changes: 45 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest,
DebouncedPruneShardsRequest, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse,
GetOrCreateOpenShardsSubrequest,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand All @@ -51,7 +52,7 @@ use quickwit_proto::metastore::{
IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService,
MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid};
use serde::Serialize;
use serde_json::{json, Value as JsonValue};
use tokio::sync::watch;
Expand All @@ -71,13 +72,16 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(5)
};

/// Minimum period between two identical shard pruning operations.
const PRUNE_SHARDS_COOLDOWN_PERIOD: Duration = Duration::from_secs(120);

/// Minimum period between two rebuild plan operations.
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);

#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
struct RebuildPlan;

pub struct ControlPlane {
Expand All @@ -94,6 +98,7 @@ pub struct ControlPlane {
ingest_controller: IngestController,
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
prune_shards_debouncers: HashMap<(IndexId, SourceId), Debouncer>,
rebuild_plan_debouncer: Debouncer,
readiness_tx: watch::Sender<bool>,
// Disables the control loop. This is useful for unit testing.
Expand Down Expand Up @@ -177,6 +182,7 @@ impl ControlPlane {
ingest_controller,
metastore: metastore.clone(),
model: Default::default(),
prune_shards_debouncers: HashMap::new(),
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
readiness_tx,
disable_control_loop,
Expand Down Expand Up @@ -387,7 +393,7 @@ impl ControlPlane {
.next_rebuild_tracker
.next_rebuild_waiter();
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlan>(ctx);
.self_send_with_cooldown(ctx, RebuildPlan);
next_rebuild_waiter
}
}
Expand Down Expand Up @@ -769,6 +775,41 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
}
}

#[async_trait]
impl Handler<DebouncedPruneShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<EmptyResponse>;

async fn handle(
&mut self,
request: DebouncedPruneShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
let metastore_request = request.request.unwrap();
if request.execute_now {
self.metastore
.prune_shards(metastore_request)
.await
.context("failed to prune shards")?;
return Ok(Ok(EmptyResponse {}));
}

self.prune_shards_debouncers
.entry((
metastore_request.index_uid().index_id.clone(),
metastore_request.source_id.clone(),
))
.or_insert_with(|| Debouncer::new(PRUNE_SHARDS_COOLDOWN_PERIOD))
.self_send_with_cooldown(
ctx,
DebouncedPruneShardsRequest {
request: Some(metastore_request),
execute_now: true,
},
);
Ok(Ok(EmptyResponse {}))
}
}

// This is neither a proxied call nor a metastore callback.
#[async_trait]
impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
Expand Down
23 changes: 12 additions & 11 deletions quickwit/quickwit-control-plane/src/debouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ impl Debouncer {
previous_state
}

fn emit_message<A, M>(&self, ctx: &ActorContext<A>)
fn emit_message<A, M>(&self, ctx: &ActorContext<A>, message: M)
where
A: Actor + Handler<M> + DeferableReplyHandler<M>,
M: Default + std::fmt::Debug + Send + Sync + 'static,
M: std::fmt::Debug + Send + Sync + 'static,
{
let _ = ctx.mailbox().send_message_with_high_priority(M::default());
let _ = ctx.mailbox().send_message_with_high_priority(message);
}

fn schedule_post_cooldown_callback<A, M>(&self, ctx: &ActorContext<A>)
fn schedule_post_cooldown_callback<A, M>(&self, ctx: &ActorContext<A>, message: M)
where
A: Actor + Handler<M> + DeferableReplyHandler<M>,
M: Default + std::fmt::Debug + Send + Sync + 'static,
M: Clone + std::fmt::Debug + Send + Sync + 'static,
{
let ctx_clone = ctx.clone();
let self_clone = self.clone();
let callback = move || {
let previous_state = self_clone.accept_transition(Transition::CooldownExpired);
if previous_state == DebouncerState::CooldownScheduled {
self_clone.self_send_with_cooldown(&ctx_clone);
self_clone.self_send_with_cooldown(&ctx_clone, message);
}
};
ctx.spawn_ctx()
Expand All @@ -119,14 +119,15 @@ impl Debouncer {
pub fn self_send_with_cooldown<M>(
&self,
ctx: &ActorContext<impl Handler<M> + DeferableReplyHandler<M>>,
message: M,
) where
M: Default + std::fmt::Debug + Send + Sync + 'static,
M: Clone + std::fmt::Debug + Send + Sync + 'static,
{
let cooldown_state = self.accept_transition(Transition::Emit);
match cooldown_state {
DebouncerState::NoCooldown => {
self.emit_message(ctx);
self.schedule_post_cooldown_callback(ctx);
self.emit_message(ctx, message.clone());
self.schedule_post_cooldown_callback(ctx, message);
}
DebouncerState::CooldownNotScheduled | DebouncerState::CooldownScheduled => {}
}
Expand Down Expand Up @@ -156,7 +157,7 @@ mod tests {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
struct Increment;

#[derive(Debug)]
Expand Down Expand Up @@ -194,7 +195,7 @@ mod tests {
_message: DebouncedIncrement,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
self.debouncer.self_send_with_cooldown::<Increment>(ctx);
self.debouncer.self_send_with_cooldown(ctx, Increment);
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::fmt;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient};
use quickwit_proto::control_plane::{
ControlPlaneService, ControlPlaneServiceClient, DebouncedPruneShardsRequest,
};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
Expand All @@ -35,8 +37,8 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};

Expand Down Expand Up @@ -120,6 +122,15 @@ impl MetastoreService for ControlPlaneMetastore {
Ok(response)
}

async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
let debounced_request = DebouncedPruneShardsRequest {
request: Some(request),
execute_now: false,
};
let response = self.control_plane.prune_shards(debounced_request).await?;
Ok(response)
}

// Other metastore API calls.

async fn index_metadata(
Expand Down Expand Up @@ -237,14 +248,6 @@ impl MetastoreService for ControlPlaneMetastore {
self.metastore.delete_shards(request).await
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
// TODO this call should go through the control plane which should apply debounce
self.metastore.prune_shards(request).await
}

// Index Template API

async fn create_index_template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
PruneShardsResponse,
};
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -657,7 +656,7 @@ impl FileBackedIndex {
pub(crate) fn prune_shards(
&mut self,
request: PruneShardsRequest,
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
) -> MetastoreResult<MutationOccurred<()>> {
self.get_shards_for_source_mut(&request.source_id)?
.prune_shards(request)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse,
EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult,
OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, PruneShardsResponse,
OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use time::OffsetDateTime;
Expand Down Expand Up @@ -242,7 +242,7 @@ impl Shards {
pub(super) fn prune_shards(
&mut self,
request: PruneShardsRequest,
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
) -> MetastoreResult<MutationOccurred<()>> {
let initial_shard_count = self.shards.len();

if let Some(max_age) = request.max_age {
Expand All @@ -267,14 +267,10 @@ impl Shards {
}
}
}
let response = PruneShardsResponse {
index_uid: request.index_uid,
source_id: request.source_id,
};
if initial_shard_count > self.shards.len() {
Ok(MutationOccurred::Yes(response))
Ok(MutationOccurred::Yes(()))
} else {
Ok(MutationOccurred::No(response))
Ok(MutationOccurred::No(()))
}
}

Expand Down Expand Up @@ -646,23 +642,19 @@ mod tests {
max_age: None,
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(50),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let current_timestamp = OffsetDateTime::now_utc().unix_timestamp();
shards.shards.insert(
Expand Down Expand Up @@ -696,22 +688,18 @@ mod tests {
max_age: Some(150),
max_count: None,
};
let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else {
let MutationOccurred::Yes(()) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::Yes`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(150),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);
}
}
16 changes: 6 additions & 10 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
OpenShardsResponse, PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -892,15 +892,11 @@ impl MetastoreService for FileBackedMetastore {
Ok(response)
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
let index_uid = request.index_uid().clone();
let response = self
.mutate(&index_uid, |index| index.prune_shards(request))
self.mutate(&index_uid, |index| index.prune_shards(request))
.await?;
Ok(response)
Ok(EmptyResponse {})
}

async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult<ListShardsResponse> {
Expand Down
17 changes: 4 additions & 13 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ use quickwit_proto::metastore::{
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
};
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
Expand Down Expand Up @@ -1488,10 +1487,7 @@ impl MetastoreService for PostgresqlMetastore {
Ok(response)
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql");
const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql");

Expand All @@ -1513,12 +1509,7 @@ impl MetastoreService for PostgresqlMetastore {
.execute(&self.connection_pool)
.await?;
}

let response = PruneShardsResponse {
index_uid: request.index_uid,
source_id: request.source_id,
};
Ok(response)
Ok(EmptyResponse {})
}

// Index Template API
Expand Down
Loading

0 comments on commit b08930c

Please sign in to comment.