Skip to content

Commit

Permalink
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
Browse files Browse the repository at this point in the history
  • Loading branch information
samoii committed Sep 7, 2024
2 parents 78ec724 + 79acfe4 commit 183de31
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/node-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ searcher:
Example:

```yaml
searcher:
jaeger:
enable_endpoint: true
```

Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub async fn run_garbage_collect(
)
.await
}
#[instrument(skip(storages, metastore, progress_opt))]
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
Expand All @@ -183,6 +183,7 @@ async fn delete_splits_marked_for_deletion(
let mut failed_splits = Vec::new();

'outer: loop {
let mut exit = false;
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
Expand Down Expand Up @@ -248,11 +249,13 @@ async fn delete_splits_marked_for_deletion(
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
break;
exit = true;
}
}
}
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Source for GcpPubSubSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::PubSub);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);
// TODO: ensure we ACK the message after being commit: at least once
// TODO: ensure we increase_ack_deadline for the items
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ impl Source for IngestSource {
let mut batch_builder = BatchBuilder::new(SourceType::IngestV2);

let now = time::Instant::now();
let deadline = now + EMIT_BATCHES_TIMEOUT;

let deadline = now + *EMIT_BATCHES_TIMEOUT;
loop {
match time::timeout_at(deadline, self.fetch_stream.next()).await {
Ok(Ok(fetch_message)) => match fetch_message.message {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl Source for KafkaSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::Kafka);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Source for KinesisSource {
ctx: &SourceContext,
) -> Result<Duration, ActorExitStatus> {
let mut batch_builder = BatchBuilder::new(SourceType::Kinesis);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
16 changes: 14 additions & 2 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use gcp_pubsub_source::{GcpPubSubSource, GcpPubSubSourceFactory};
pub use kafka_source::{KafkaSource, KafkaSourceFactory};
#[cfg(feature = "kinesis")]
pub use kinesis::kinesis_source::{KinesisSource, KinesisSourceFactory};
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
#[cfg(feature = "pulsar")]
pub use pulsar_source::{PulsarSource, PulsarSourceFactory};
#[cfg(feature = "sqs")]
Expand Down Expand Up @@ -138,7 +138,19 @@ use crate::source::ingest_api_source::IngestApiSourceFactory;
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64();

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });
static EMIT_BATCHES_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
if cfg!(any(test, feature = "testsuite")) {
let timeout = Duration::from_millis(100);
assert!(timeout < *quickwit_actors::HEARTBEAT);
timeout
} else {
let timeout = Duration::from_millis(1_000);
if *quickwit_actors::HEARTBEAT < timeout {
error!("QW_ACTOR_HEARTBEAT_SECS smaller than batch timeout");
}
timeout
}
});

/// Runtime configuration used during execution of a source actor.
#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/pulsar_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Source for PulsarSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::Pulsar);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down

0 comments on commit 183de31

Please sign in to comment.