Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate metastore to code generated metastore service #3898

Merged
merged 4 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 18 additions & 11 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use quickwit_config::{
DEFAULT_QW_CONFIG_PATH,
};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{Metastore, MetastoreResolver};
use quickwit_metastore::{IndexMetadataResponseExt, MetastoreResolver};
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_rest_client::models::Timeout;
use quickwit_rest_client::rest_client::{QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL};
use quickwit_storage::{load_file, StorageResolver};
Expand Down Expand Up @@ -256,23 +257,29 @@ fn get_resolvers(
/// Optionally, it takes a `SourceConfig` that will be checked instead
/// of the index's sources.
pub async fn run_index_checklist(
metastore: &dyn Metastore,
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
index_id: &str,
source_config_opt: Option<&SourceConfig>,
) -> anyhow::Result<()> {
let mut checks: Vec<(&str, anyhow::Result<()>)> = Vec::new();

// The metastore is file-backed, so we must check the storage first.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is useless. The check connectivity of the metastore will do the right check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it. The checklist will tell you it's a metastore issue, whereas it stems from the storage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will change the comment then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to do the right check in the metastore method and make sure the error displayed is clean.

if !metastore.uri().protocol().is_database() {
let metastore_storage = storage_resolver.resolve(metastore.uri()).await?;
checks.push((
"metastore storage",
metastore_storage.check_connectivity().await,
));
for metastore_endpoint in metastore.endpoints() {
// If it's not a database, the metastore is file-backed. To display a nicer message to the
// user, we check the metastore storage connectivity before the mestastore check
// connectivity which will check the storage anyway.
if !metastore_endpoint.protocol().is_database() {
let metastore_storage = storage_resolver.resolve(&metastore_endpoint).await?;
checks.push((
"metastore storage",
metastore_storage.check_connectivity().await,
));
}
}
checks.push(("metastore", metastore.check_connectivity().await));
let index_metadata = metastore.index_metadata(index_id).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
checks.push(("index storage", index_storage.check_connectivity().await));

Expand Down
26 changes: 15 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::io::{stdout, IsTerminal, Stdout, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{env, fmt, io};

Expand All @@ -49,7 +48,8 @@ use quickwit_indexing::models::{
};
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::SearchResponse;
use quickwit_proto::NodeId;
use quickwit_search::{single_node_search, SearchResponseRest};
Expand Down Expand Up @@ -409,7 +409,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
SourceParams::file(filepath)
Expand All @@ -429,15 +429,15 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
input_format: args.input_format,
};
run_index_checklist(
&*metastore,
&mut metastore,
&storage_resolver,
&args.index_id,
Some(&source_config),
)
.await?;

if args.overwrite {
let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&args.index_id).await?;
}
// The indexing service needs to update its cluster chitchat state so that the control plane is
Expand Down Expand Up @@ -533,7 +533,8 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore: Arc<dyn Metastore> = metastore_resolver.resolve(&config.metastore_uri).await?;
let metastore: MetastoreServiceClient =
metastore_resolver.resolve(&config.metastore_uri).await?;
let aggs = args
.aggregation
.map(|agg_string| serde_json::from_str(&agg_string))
Expand Down Expand Up @@ -568,8 +569,8 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&*metastore, &storage_resolver, &args.index_id, None).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?;
// The indexing service needs to update its cluster chitchat state so that the control plane is
// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
// and avoid impacting potential control plane running on the cluster.
Expand Down Expand Up @@ -657,7 +658,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_service = IndexService::new(metastore, storage_resolver);
let mut index_service = IndexService::new(metastore, storage_resolver);
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
Expand Down Expand Up @@ -721,8 +722,11 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> {
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore.index_metadata(&args.index_id).await?;
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id(args.index_id))
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
let split_file = PathBuf::from(format!("{}.split", args.split_id));
let split_data = index_storage.get_all(split_file.as_path()).await?;
Expand Down
Loading