Skip to content

Commit

Permalink
Migrate the metastore trait to the code generated metastore service a…
Browse files Browse the repository at this point in the history
…nd client.
  • Loading branch information
fmassot committed Oct 18, 2023
1 parent 359a2f2 commit 148460a
Show file tree
Hide file tree
Showing 95 changed files with 9,572 additions and 6,369 deletions.
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use quickwit_config::{
DEFAULT_QW_CONFIG_PATH,
};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{Metastore, MetastoreResolver};
use quickwit_metastore::{IndexMetadataResponseExt, MetastoreResolver};
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_rest_client::models::Timeout;
use quickwit_rest_client::rest_client::{QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL};
use quickwit_storage::{load_file, StorageResolver};
Expand Down Expand Up @@ -256,23 +257,20 @@ fn get_resolvers(
/// Optionally, it takes a `SourceConfig` that will be checked instead
/// of the index's sources.
pub async fn run_index_checklist(
metastore: &dyn Metastore,
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
index_id: &str,
source_config_opt: Option<&SourceConfig>,
) -> anyhow::Result<()> {
let mut checks: Vec<(&str, anyhow::Result<()>)> = Vec::new();

// The metastore is file-backed, so we must check the storage first.
if !metastore.uri().protocol().is_database() {
let metastore_storage = storage_resolver.resolve(metastore.uri()).await?;
checks.push((
"metastore storage",
metastore_storage.check_connectivity().await,
));
}
checks.push(("metastore", metastore.check_connectivity().await));
let index_metadata = metastore.index_metadata(index_id).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest {
index_id: index_id.to_string(),
})
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
checks.push(("index storage", index_storage.check_connectivity().await));

Expand Down
28 changes: 17 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::io::{stdout, IsTerminal, Stdout, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{env, fmt, io};

Expand All @@ -49,7 +48,8 @@ use quickwit_indexing::models::{
};
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::SearchResponse;
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
Expand Down Expand Up @@ -408,7 +408,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
SourceParams::file(filepath)
Expand All @@ -428,15 +428,15 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
input_format: args.input_format,
};
run_index_checklist(
&*metastore,
&mut metastore,
&storage_resolver,
&args.index_id,
Some(&source_config),
)
.await?;

if args.overwrite {
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&args.index_id).await?;
}
// The indexing service needs to update its cluster chitchat state so that the control plane is
Expand Down Expand Up @@ -532,7 +532,8 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore: Arc<dyn Metastore> = metastore_resolver.resolve(&config.metastore_uri).await?;
let metastore: MetastoreServiceClient =
metastore_resolver.resolve(&config.metastore_uri).await?;
let aggs = args
.aggregation
.map(|agg_string| serde_json::from_str(&agg_string))
Expand Down Expand Up @@ -567,8 +568,8 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&*metastore, &storage_resolver, &args.index_id, None).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?;
// The indexing service needs to update its cluster chitchat state so that the control plane is
// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
// and avoid impacting potential control plane running on the cluster.
Expand Down Expand Up @@ -656,7 +657,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_service = IndexService::new(metastore, storage_resolver);
let mut index_service = IndexService::new(metastore, storage_resolver);
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
Expand Down Expand Up @@ -720,8 +721,13 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore.index_metadata(&args.index_id).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest {
index_id: args.index_id,
})
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
let split_file = PathBuf::from(format!("{}.split", args.split_id));
let split_data = index_storage.get_all(split_file.as_path()).await?;
Expand Down
Loading

0 comments on commit 148460a

Please sign in to comment.