Skip to content

Commit

Permalink
Add changes from review's suggestions.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Oct 20, 2023
1 parent 148460a commit 77c80df
Show file tree
Hide file tree
Showing 35 changed files with 389 additions and 667 deletions.
22 changes: 11 additions & 11 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{
ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt, SplitState,
StageSplitsRequestExt,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt,
SplitState, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListAllSplitsRequest,
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, StageSplitsRequest,
};
use serde_json::{json, Number, Value};
Expand Down Expand Up @@ -252,7 +252,7 @@ async fn test_ingest_docs_cli() {
let splits: Vec<_> = test_env
.metastore()
.await
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down Expand Up @@ -670,7 +670,7 @@ async fn test_garbage_collect_cli_no_grace() {
};

let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down Expand Up @@ -721,7 +721,7 @@ async fn test_garbage_collect_cli_no_grace() {
let mut metastore = refresh_metastore(metastore).await.unwrap();
assert_eq!(
metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down Expand Up @@ -783,7 +783,7 @@ async fn test_garbage_collect_index_cli() {
.unwrap();

let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand All @@ -802,7 +802,7 @@ async fn test_garbage_collect_index_cli() {
// Split should still exists within grace period.
let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down Expand Up @@ -843,7 +843,7 @@ async fn test_garbage_collect_index_cli() {

let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand All @@ -858,7 +858,7 @@ async fn test_garbage_collect_index_cli() {
// Staged splits should still exist within grace period.
let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand All @@ -876,7 +876,7 @@ async fn test_garbage_collect_index_cli() {

let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ syn = { workspace = true }
tonic-build = { workspace = true }

[dev-dependencies]
serde = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
1 change: 0 additions & 1 deletion quickwit/quickwit-common/src/tower/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use tower::{Layer, Service};
use crate::pubsub::{Event, EventBroker};

#[derive(Clone)]

pub struct EventListener<S> {
inner: S,
event_broker: EventBroker,
Expand Down
41 changes: 23 additions & 18 deletions quickwit/quickwit-common/src/tower/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,19 @@ use tracing::debug;

use crate::retry::{RetryParams, Retryable};

/// Retry requests based on a policy.
/// Taken from tower
/// FIXME(fmassot): I tried to use directly tower retry layer
/// but when trying to set it for the MetastoreService, the
/// compiler complains that the RetryLayer was not clonable.
/// I don't understand why.
/// Retry layer copy/pasted from `tower::retry::RetryLayer`
/// but which implements `Clone`.
impl<P, S> Layer<S> for RetryLayer<P>
where P: Clone
{
type Service = Retry<P, S>;

fn layer(&self, service: S) -> Self::Service {
let policy = self.policy.clone();
Retry::new(policy, service)
}
}

#[derive(Clone, Debug)]
pub struct RetryLayer<P> {
policy: P,
Expand All @@ -49,23 +56,21 @@ impl<P> RetryLayer<P> {
}
}

impl<P, S> Layer<S> for RetryLayer<P>
where P: Clone
{
type Service = Retry<P, S>;

fn layer(&self, service: S) -> Self::Service {
let policy = self.policy.clone();
Retry::new(policy, service)
}
}

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Copy, Default)]
pub struct RetryPolicy {
num_attempts: usize,
retry_params: RetryParams,
}

impl Clone for RetryPolicy {
fn clone(&self) -> Self {
Self {
num_attempts: self.num_attempts,
retry_params: self.retry_params,
}
}
}

impl From<RetryParams> for RetryPolicy {
fn from(retry_params: RetryParams) -> Self {
Self {
Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ mod tests {
})
});
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {
.withf(|delete_index_request| delete_index_request.index_uid == "test-index:0")
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
Expand Down Expand Up @@ -548,7 +548,7 @@ mod tests {
})
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(move |_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_metadata.clone()
Expand Down Expand Up @@ -601,7 +601,7 @@ mod tests {
})
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
Expand Down Expand Up @@ -650,7 +650,7 @@ mod tests {
})
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
Expand Down Expand Up @@ -690,7 +690,7 @@ mod tests {

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|_| {
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
let source_config = SourceConfig::for_test(INGEST_SOURCE_ID, SourceParams::void());
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {
index_0.add_source(source.clone()).unwrap();

mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.times(2) // 1 for the first initialization, 1 after the respawn of the control plane.
.returning(|list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ControlPlaneModel {
let now = Instant::now();
self.clear();
let index_metadatas = progress
.protect_future(metastore.list_indexes_metadatas(ListIndexesMetadataRequest::all()))
.protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all()))
.await?
.deserialize_indexes_metadata()?;
let num_indexes = index_metadatas.len();
Expand Down Expand Up @@ -685,7 +685,7 @@ mod tests {

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore
.expect_list_indexes_metadatas()
.expect_list_indexes_metadata()
.returning(|request| {
assert_eq!(request, ListIndexesMetadataRequest::all());

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn start_control_plane(
let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1);
index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1;
let mut metastore = MetastoreServiceClient::mock();
metastore.expect_list_indexes_metadatas().returning(
metastore.expect_list_indexes_metadata().returning(
move |_list_indexes_request: quickwit_proto::metastore::ListIndexesMetadataRequest| {
let indexes_metadata = vec![index_metadata_2.clone(), index_metadata_1.clone()];
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata).unwrap())
Expand Down
27 changes: 10 additions & 17 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use quickwit_proto::metastore::{
DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::IndexUid;
use quickwit_proto::{IndexUid, SplitId};
use quickwit_storage::{BulkDeleteError, Storage};
use thiserror::Error;
use time::OffsetDateTime;
Expand Down Expand Up @@ -106,10 +106,7 @@ pub async fn run_garbage_collect(
metastore.list_splits(list_deletable_staged_request),
)
.await?
.deserialize_splits()?
.into_iter()
.map(|split| split.split_metadata)
.collect();
.deserialize_splits_metadata()?;

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone())
Expand All @@ -121,10 +118,7 @@ pub async fn run_garbage_collect(
metastore.list_splits(marked_for_deletion_request),
)
.await?
.deserialize_splits()?
.into_iter()
.map(|split| split.split_metadata)
.collect();
.deserialize_splits_metadata()?;
splits_marked_for_deletion.extend(deletable_staged_splits);

let candidate_entries: Vec<SplitInfo> = splits_marked_for_deletion
Expand All @@ -138,7 +132,7 @@ pub async fn run_garbage_collect(
}

// Schedule all eligible staged splits for delete
let split_ids: Vec<String> = deletable_staged_splits
let split_ids: Vec<SplitId> = deletable_staged_splits
.iter()
.map(|split| split.split_id.to_string())
.collect();
Expand Down Expand Up @@ -195,7 +189,7 @@ async fn delete_splits_marked_for_deletion(
let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(query) {
Ok(request) => request,
Err(error) => {
error!(error = ?error, "Failed to build list splits request.");
error!(error = ?error, "failed to build list splits request");
break;
}
};
Expand Down Expand Up @@ -307,7 +301,7 @@ pub async fn delete_splits_from_storage_and_metastore(
}
};
if !successes.is_empty() {
let split_ids: Vec<String> = successes
let split_ids: Vec<SplitId> = successes
.iter()
.map(|split_info| split_info.split_id.to_string())
.collect();
Expand Down Expand Up @@ -359,8 +353,7 @@ mod tests {
StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
CreateIndexRequest, EntityKind, ListAllSplitsRequest, ListSplitsResponse,
StageSplitsRequest,
CreateIndexRequest, EntityKind, ListSplitsResponse, StageSplitsRequest,
};
use quickwit_proto::IndexUid;
use quickwit_storage::{
Expand Down Expand Up @@ -631,7 +624,7 @@ mod tests {
assert!(storage.exists(split_path).await.unwrap());

let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand All @@ -656,7 +649,7 @@ mod tests {
);
assert!(!storage.exists(split_path).await.unwrap());
assert!(metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down Expand Up @@ -749,7 +742,7 @@ mod tests {
assert_eq!(error.metastore_failures.len(), 0);

let splits = metastore
.list_all_splits(ListAllSplitsRequest::from(&index_uid))
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
Expand Down
Loading

0 comments on commit 77c80df

Please sign in to comment.