Skip to content

Commit

Permalink
Migrate metastore to code generated metastore service (#3898)
Browse files Browse the repository at this point in the history
* Migrate the metastore trait to the code generated metastore service and client.

* Add changes from review's suggestions.

* Last clean before merge.

* Readd metastore storage connectivity check.
  • Loading branch information
fmassot authored Oct 21, 2023
1 parent b392a1d commit 4d83afd
Show file tree
Hide file tree
Showing 95 changed files with 9,526 additions and 6,750 deletions.
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 18 additions & 11 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use quickwit_config::{
DEFAULT_QW_CONFIG_PATH,
};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{Metastore, MetastoreResolver};
use quickwit_metastore::{IndexMetadataResponseExt, MetastoreResolver};
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_rest_client::models::Timeout;
use quickwit_rest_client::rest_client::{QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL};
use quickwit_storage::{load_file, StorageResolver};
Expand Down Expand Up @@ -256,23 +257,29 @@ fn get_resolvers(
/// Optionally, it takes a `SourceConfig` that will be checked instead
/// of the index's sources.
pub async fn run_index_checklist(
metastore: &dyn Metastore,
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
index_id: &str,
source_config_opt: Option<&SourceConfig>,
) -> anyhow::Result<()> {
let mut checks: Vec<(&str, anyhow::Result<()>)> = Vec::new();

// The metastore is file-backed, so we must check the storage first.
if !metastore.uri().protocol().is_database() {
let metastore_storage = storage_resolver.resolve(metastore.uri()).await?;
checks.push((
"metastore storage",
metastore_storage.check_connectivity().await,
));
for metastore_endpoint in metastore.endpoints() {
// If it's not a database, the metastore is file-backed. To display a nicer message to the
// user, we check the metastore storage connectivity before the mestastore check
// connectivity which will check the storage anyway.
if !metastore_endpoint.protocol().is_database() {
let metastore_storage = storage_resolver.resolve(&metastore_endpoint).await?;
checks.push((
"metastore storage",
metastore_storage.check_connectivity().await,
));
}
}
checks.push(("metastore", metastore.check_connectivity().await));
let index_metadata = metastore.index_metadata(index_id).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
checks.push(("index storage", index_storage.check_connectivity().await));

Expand Down
26 changes: 15 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::io::{stdout, IsTerminal, Stdout, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{env, fmt, io};

Expand All @@ -49,7 +48,8 @@ use quickwit_indexing::models::{
};
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::SearchResponse;
use quickwit_proto::NodeId;
use quickwit_search::{single_node_search, SearchResponseRest};
Expand Down Expand Up @@ -409,7 +409,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
SourceParams::file(filepath)
Expand All @@ -429,15 +429,15 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
input_format: args.input_format,
};
run_index_checklist(
&*metastore,
&mut metastore,
&storage_resolver,
&args.index_id,
Some(&source_config),
)
.await?;

if args.overwrite {
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&args.index_id).await?;
}
// The indexing service needs to update its cluster chitchat state so that the control plane is
Expand Down Expand Up @@ -533,7 +533,8 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore: Arc<dyn Metastore> = metastore_resolver.resolve(&config.metastore_uri).await?;
let metastore: MetastoreServiceClient =
metastore_resolver.resolve(&config.metastore_uri).await?;
let aggs = args
.aggregation
.map(|agg_string| serde_json::from_str(&agg_string))
Expand Down Expand Up @@ -568,8 +569,8 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&*metastore, &storage_resolver, &args.index_id, None).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?;
// The indexing service needs to update its cluster chitchat state so that the control plane is
// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
// and avoid impacting potential control plane running on the cluster.
Expand Down Expand Up @@ -657,7 +658,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_service = IndexService::new(metastore, storage_resolver);
let mut index_service = IndexService::new(metastore, storage_resolver);
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
Expand Down Expand Up @@ -721,8 +722,11 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore.index_metadata(&args.index_id).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id(args.index_id))
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
let split_file = PathBuf::from(format!("{}.split", args.split_id));
let split_data = index_storage.get_all(split_file.as_path()).await?;
Expand Down
Loading

0 comments on commit 4d83afd

Please sign in to comment.