diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile index 92b9e04c84a..3ccd3633ea9 100644 --- a/distribution/lambda/Makefile +++ b/distribution/lambda/Makefile @@ -56,7 +56,7 @@ indexer-package-path: searcher-package-path: echo -n $(SEARCHER_PACKAGE_PATH) -bootstrap: package check-env +bootstrap: cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION deploy-hdfs: package check-env @@ -65,17 +65,20 @@ deploy-hdfs: package check-env deploy-mock-data: package check-env cdk deploy -a cdk/app.py MockDataStack +print-mock-data-metastore: check-env + python -c 'from cdk import cli; cli.print_mock_data_metastore()' + # address https://github.com/aws/aws-cdk/issues/20060 before-destroy: mkdir -p cdk.out touch $(INDEXER_PACKAGE_PATH) touch $(SEARCHER_PACKAGE_PATH) -destroy-hdfs: before-destroy +destroy-hdfs: before-destroy check-env python -c 'from cdk import cli; cli.empty_hdfs_bucket()' cdk destroy --force -a cdk/app.py HdfsStack -destroy-mock-data: before-destroy +destroy-mock-data: before-destroy check-env python -c 'from cdk import cli; cli.empty_mock_data_buckets()' cdk destroy --force -a cdk/app.py MockDataStack @@ -108,6 +111,7 @@ bench-index: done bench-search-term: + export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do @@ -117,6 +121,7 @@ bench-search-term: done bench-search-histogram: + export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do diff --git a/distribution/lambda/cdk/cli.py b/distribution/lambda/cdk/cli.py index bfffc5f846a..0cb9d522770 100644 --- a/distribution/lambda/cdk/cli.py +++ b/distribution/lambda/cdk/cli.py @@ -316,6 +316,15 @@ def empty_mock_data_buckets(): _clean_s3_bucket(bucket_name) +def print_mock_data_metastore(): + bucket_name = _get_cloudformation_output_value( + app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME + ) + s3 = session.client("s3") + response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json") + print(response["Body"].read().decode()) + + @cache def _git_commit(): return subprocess.run( diff --git a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py index a83e4546075..4822e69723a 100644 --- a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py +++ b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py @@ -12,7 +12,7 @@ from constructs import Construct import yaml -from ..services.quickwit_service import QuickwitService +from ..services import quickwit_service SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name" INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name" @@ -28,7 +28,7 @@ def __init__( scope: Construct, construct_id: str, index_id: str, - qw_svc: QuickwitService, + qw_svc: quickwit_service.QuickwitService, **kwargs, ): super().__init__(scope, construct_id, **kwargs) @@ -83,7 +83,7 @@ def __init__( scope: Construct, construct_id: str, index_id: str, - qw_svc: QuickwitService, + qw_svc: quickwit_service.QuickwitService, api_key: str, **kwargs, ) -> None: @@ -149,12 +149,15 @@ def __init__( "mock-data-index-config", path=index_config_local_path, ) - qw_svc = QuickwitService( + lambda_env = quickwit_service.extract_local_env() + qw_svc = quickwit_service.QuickwitService( self, "Quickwit", index_id=index_id, index_config_bucket=index_config.s3_bucket_name, index_config_key=index_config.s3_object_key, + indexer_environment=lambda_env, + searcher_environment=lambda_env, indexer_package_location=indexer_package_location, searcher_package_location=searcher_package_location, ) diff --git a/distribution/lambda/cdk/stacks/services/indexer_service.py b/distribution/lambda/cdk/stacks/services/indexer_service.py index 1dee9230e6f..65a32ffb8a6 100644 --- a/distribution/lambda/cdk/stacks/services/indexer_service.py +++ b/distribution/lambda/cdk/stacks/services/indexer_service.py @@ -32,7 +32,9 @@ def __init__( "QW_LAMBDA_INDEX_CONFIG_URI": f"s3://{index_config_bucket}/{index_config_key}", **environment, }, - timeout=aws_cdk.Duration.minutes(15), + # use a strict timeout and retry policy to avoid unexpected costs + timeout=aws_cdk.Duration.minutes(1), + retry_attempts=0, reserved_concurrent_executions=1, memory_size=memory_size, ephemeral_storage_size=aws_cdk.Size.gibibytes(10), diff --git a/distribution/lambda/cdk/stacks/services/quickwit_service.py b/distribution/lambda/cdk/stacks/services/quickwit_service.py index 2887983f1c1..d0505b63faf 100644 --- a/distribution/lambda/cdk/stacks/services/quickwit_service.py +++ b/distribution/lambda/cdk/stacks/services/quickwit_service.py @@ -12,8 +12,12 @@ def extract_local_env() -> dict[str, str]: - """Extracts local environment variables that start with QW_LAMBDA_""" - return {k: os.environ[k] for k in os.environ.keys() if k.startswith("QW_LAMBDA_")} + """Extracts local environment variables QW_LAMBDA_* and QW_DISABLE_TELEMETRY""" + return { + k: os.environ[k] + for k in os.environ.keys() + if (k.startswith("QW_LAMBDA_") or k == "QW_DISABLE_TELEMETRY") + } class QuickwitService(Construct): diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index dcb76b75023..9b1a194f690 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6050,6 +6050,7 @@ dependencies = [ "quickwit-index-management", "quickwit-indexing", "quickwit-ingest", + "quickwit-janitor", "quickwit-metastore", "quickwit-proto", "quickwit-rest-client", diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index 8aa69f74e1f..cd799b68fdb 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -744,7 +744,7 @@ where .hello(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } async fn goodbye( &mut self, @@ -754,7 +754,7 @@ where .goodbye(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } async fn ping( &mut self, @@ -766,9 +766,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream.map_err(|error| error.into()) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { @@ -809,7 +809,7 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { .hello(request.into_inner()) .await .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) + .map_err(|error| error.into()) } async fn goodbye( &self, @@ -820,7 +820,7 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { .goodbye(request.into_inner()) .await .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) + .map_err(|error| error.into()) } type PingStream = quickwit_common::ServiceStream>; async fn ping( @@ -834,10 +834,8 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { quickwit_common::ServiceStream::from(streaming) }) .await - .map(|stream| tonic::Response::new( - stream.map_err(crate::error::grpc_error_to_grpc_status), - )) - .map_err(crate::error::grpc_error_to_grpc_status) + .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) + .map_err(|error| error.into()) } } /// Generated client implementations. diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index 48449e20f74..c56424bf524 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -803,21 +803,21 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } async fn fetch(&mut self, request: FetchRequest) -> crate::Result { self.inner .fetch(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } async fn tail(&mut self, request: TailRequest) -> crate::Result { self.inner .tail(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|error| error.into()) } } #[derive(Debug)] @@ -843,7 +843,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .ingest(request.into_inner()) .await .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) + .map_err(|error| error.into()) } async fn fetch( &self, @@ -854,7 +854,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .fetch(request.into_inner()) .await .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) + .map_err(|error| error.into()) } async fn tail( &self, @@ -865,7 +865,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .tail(request.into_inner()) .await .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) + .map_err(|error| error.into()) } } /// Generated client implementations. diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index 66131048ba4..9abb75ac44c 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -51,6 +51,7 @@ quickwit-doc-mapper = { workspace = true } quickwit-index-management = { workspace = true } quickwit-indexing = { workspace = true } quickwit-ingest = { workspace = true } +quickwit-janitor = { workspace = true } quickwit-metastore = { workspace = true } quickwit-proto = { workspace = true } quickwit-rest-client = { workspace = true } diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs index 4b8aea034e0..f31196955f1 100644 --- a/quickwit/quickwit-lambda/src/bin/indexer.rs +++ b/quickwit/quickwit-lambda/src/bin/indexer.rs @@ -23,7 +23,7 @@ use quickwit_lambda::logger; #[tokio::main] async fn main() -> anyhow::Result<()> { - logger::setup_lambda_tracer()?; + logger::setup_lambda_tracer(tracing::Level::INFO)?; let func = service_fn(handler); lambda_runtime::run(func) .await diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs index eeef43d00bc..564ea4e6653 100644 --- a/quickwit/quickwit-lambda/src/bin/searcher.rs +++ b/quickwit/quickwit-lambda/src/bin/searcher.rs @@ -23,7 +23,7 @@ use quickwit_lambda::searcher::handler; #[tokio::main] async fn main() -> anyhow::Result<()> { - logger::setup_lambda_tracer()?; + logger::setup_lambda_tracer(tracing::Level::INFO)?; let func = service_fn(handler); run(func).await.map_err(|e| anyhow::anyhow!(e)) } diff --git a/quickwit/quickwit-lambda/src/environment.rs b/quickwit/quickwit-lambda/src/environment.rs new file mode 100644 index 00000000000..f279a7bc81b --- /dev/null +++ b/quickwit/quickwit-lambda/src/environment.rs @@ -0,0 +1,34 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::env::var; + +use once_cell::sync::Lazy; + +pub static INDEX_ID: Lazy = + Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); + +pub static LOG_SPAN_BOUNDARIES: Lazy = + Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true")); + +pub static OPENTELEMETRY_URL: Lazy> = + Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_URL").ok()); + +pub static OPENTELEMETRY_AUTHORIZATION: Lazy> = + Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION").ok()); diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 92f264a268c..0a7e2f19acd 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -23,6 +23,7 @@ use once_cell::sync::Lazy; pub const CONFIGURATION_TEMPLATE: &str = "version: 0.6 node_id: lambda-indexer +cluster_id: lambda-ephemeral metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index data_dir: /tmp @@ -32,8 +33,8 @@ pub static INDEX_CONFIG_URI: Lazy = Lazy::new(|| { var("QW_LAMBDA_INDEX_CONFIG_URI").expect("QW_LAMBDA_INDEX_CONFIG_URI must be set") }); -pub static INDEX_ID: Lazy = - Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); - pub static DISABLE_MERGE: Lazy = Lazy::new(|| var("QW_LAMBDA_DISABLE_MERGE").is_ok_and(|v| v.as_str() == "true")); + +pub static DISABLE_JANITOR: Lazy = + Lazy::new(|| var("QW_LAMBDA_DISABLE_JANITOR").is_ok_and(|v| v.as_str() == "true")); diff --git a/quickwit/quickwit-lambda/src/indexer/handler.rs b/quickwit/quickwit-lambda/src/indexer/handler.rs index 9160b10b9b8..1282b0e54a4 100644 --- a/quickwit/quickwit-lambda/src/indexer/handler.rs +++ b/quickwit/quickwit-lambda/src/indexer/handler.rs @@ -21,9 +21,10 @@ use lambda_runtime::{Error, LambdaEvent}; use serde_json::Value; use tracing::{debug_span, error, info, info_span, Instrument}; -use super::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID}; +use super::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI}; use super::ingest::{ingest, IngestArgs}; use super::model::IndexerEvent; +use crate::environment::INDEX_ID; use crate::logger; use crate::utils::LambdaContainerContext; @@ -37,6 +38,8 @@ async fn indexer_handler(event: LambdaEvent) -> Result { input_format: quickwit_config::SourceInputFormat::Json, overwrite: false, vrl_script: None, + // TODO: instead of clearing the cache, we use a cache and set its max + // size with indexer_config.split_store_max_num_bytes clear_cache: true, }) .instrument(debug_span!( @@ -45,6 +48,7 @@ async fn indexer_handler(event: LambdaEvent) -> Result { env.INDEX_CONFIG_URI = *INDEX_CONFIG_URI, env.INDEX_ID = *INDEX_ID, env.DISABLE_MERGE = *DISABLE_MERGE, + env.DISABLE_JANITOR = *DISABLE_JANITOR, cold = container_ctx.cold, container_id = container_ctx.container_id, )) diff --git a/quickwit/quickwit-lambda/src/indexer/ingest.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs similarity index 54% rename from quickwit/quickwit-lambda/src/indexer/ingest.rs rename to quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index b449c81b84f..871c993fa1b 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -24,9 +24,8 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context}; use chitchat::transport::ChannelTransport; use chitchat::FailureDetectorConfig; -use quickwit_actors::Universe; -use quickwit_cli::tool::start_statistics_reporting_loop; -use quickwit_cli::{run_index_checklist, start_actor_runtimes}; +use quickwit_actors::{ActorHandle, Mailbox, Universe}; +use quickwit_cli::run_index_checklist; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimesConfig; @@ -34,41 +33,43 @@ use quickwit_common::uri::Uri; use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::service::QuickwitService; use quickwit_config::{ - load_index_config_from_user_config, ConfigFormat, IndexConfig, IndexerConfig, NodeConfig, - SourceConfig, SourceInputFormat, SourceParams, TransformConfig, CLI_SOURCE_ID, + load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig, + SourceInputFormat, SourceParams, TransformConfig, CLI_SOURCE_ID, }; -use quickwit_index_management::{clear_cache_directory, IndexService}; -use quickwit_indexing::actors::{IndexingService, MergePipelineId, MergeSchedulerService}; -use quickwit_indexing::models::{ - DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, +use quickwit_index_management::IndexService; +use quickwit_indexing::actors::{ + IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService, }; +use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline}; +use quickwit_indexing::IndexingPipeline; use quickwit_ingest::IngesterPool; +use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::CreateIndexRequestExt; use quickwit_proto::indexing::CpuCapacity; -use quickwit_proto::metastore::{CreateIndexRequest, MetastoreError, MetastoreService}; +use quickwit_proto::metastore::{ + CreateIndexRequest, MetastoreError, MetastoreService, MetastoreServiceClient, +}; use quickwit_proto::types::{NodeId, PipelineUid}; +use quickwit_search::SearchJobPlacer; use quickwit_storage::StorageResolver; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tracing::{debug, info, instrument}; -use super::environment::{CONFIGURATION_TEMPLATE, DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID}; -use crate::utils::load_node_config; - -#[derive(Debug, Eq, PartialEq)] -pub struct IngestArgs { - pub input_path: PathBuf, - pub input_format: SourceInputFormat, - pub overwrite: bool, - pub vrl_script: Option, - pub clear_cache: bool, -} +use crate::environment::INDEX_ID; +use crate::indexer::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI}; -async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { +/// The indexing service needs to update its cluster chitchat state so that the control plane is +/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service +/// and avoid impacting potential control plane running on the cluster. +pub(super) async fn create_empty_cluster( + config: &NodeConfig, + services: &[QuickwitService], +) -> anyhow::Result { let self_node = ClusterMember { node_id: NodeId::new(config.node_id.clone()), generation_id: quickwit_cluster::GenerationId::now(), is_ready: false, - enabled_services: HashSet::new(), + enabled_services: HashSet::from_iter(services.to_owned()), gossip_advertise_addr: config.gossip_advertise_addr, grpc_advertise_addr: config.grpc_advertise_addr, indexing_tasks: Vec::new(), @@ -88,7 +89,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { } /// TODO refactor with `dir_and_filename` in file source -pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { +fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { let dir_uri: Uri = filepath .parent() .context("Parent directory could not be resolved")? @@ -102,7 +103,7 @@ pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { } #[instrument(level = "debug", skip(resolver))] -async fn load_index_config( +pub(super) async fn load_index_config( resolver: &StorageResolver, default_index_root_uri: &Uri, ) -> anyhow::Result { @@ -121,32 +122,43 @@ async fn load_index_config( Ok(index_config) } -pub async fn ingest(args: IngestArgs) -> anyhow::Result { - debug!(args=?args, "lambda-ingest"); - let (config, storage_resolver, mut metastore) = - load_node_config(CONFIGURATION_TEMPLATE).await?; +pub(super) async fn send_telemetry() { + let services: HashSet = + HashSet::from_iter([QuickwitService::Indexer.as_str().to_string()]); + let telemetry_info = + QuickwitTelemetryInfo::new(services, HashSet::from_iter([QuickwitFeature::AwsLambda])); + let _telemetry_handle_opt = quickwit_telemetry::start_telemetry_loop(telemetry_info); + quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await; +} - let source_params = SourceParams::file(args.input_path); - let transform_config = args - .vrl_script - .map(|vrl_script| TransformConfig::new(vrl_script, None)); - let source_config = SourceConfig { +pub(super) fn configure_source( + input_path: PathBuf, + input_format: SourceInputFormat, + vrl_script: Option, +) -> SourceConfig { + let source_params = SourceParams::file(input_path); + let transform_config = vrl_script.map(|vrl_script| TransformConfig::new(vrl_script, None)); + SourceConfig { source_id: CLI_SOURCE_ID.to_string(), max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."), desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, - input_format: args.input_format, - }; + input_format, + } +} - let checklist_result = run_index_checklist( - &mut metastore, - &storage_resolver, - &INDEX_ID, - Some(&source_config), - ) - .await; +/// Check if the index exists, creating or overwriting it if necessary +pub(super) async fn init_index_if_necessary( + metastore: &mut MetastoreServiceClient, + storage_resolver: &StorageResolver, + source_config: &SourceConfig, + default_index_root_uri: &Uri, + overwrite: bool, +) -> anyhow::Result<()> { + let checklist_result = + run_index_checklist(metastore, storage_resolver, &INDEX_ID, Some(source_config)).await; if let Err(e) = checklist_result { let is_not_found = e .downcast_ref() @@ -159,8 +171,7 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { index_config_uri = *INDEX_CONFIG_URI, "Index not found, creating it" ); - let index_config = - load_index_config(&storage_resolver, &config.default_index_root_uri).await?; + let index_config = load_index_config(storage_resolver, default_index_root_uri).await?; if index_config.index_id != *INDEX_ID { bail!( "Expected index ID was {} but config file had {}", @@ -171,8 +182,8 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { metastore .create_index(CreateIndexRequest::try_from_index_config(&index_config)?) .await?; - debug!("index created"); - } else if args.overwrite { + info!("index created"); + } else if overwrite { info!( index_id = *INDEX_ID, "Overwrite enabled, clearing existing index", @@ -180,45 +191,69 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone()); index_service.clear_index(&INDEX_ID).await?; } - // The indexing service needs to update its cluster chitchat state so that the control plane is - // aware of the running tasks. We thus create a fake cluster to instantiate the indexing service - // and avoid impacting potential control plane running on the cluster. - let cluster = create_empty_cluster(&config).await?; - let indexer_config = IndexerConfig { - ..Default::default() - }; - let runtimes_config = RuntimesConfig::default(); - let services: HashSet = - HashSet::from_iter([QuickwitService::Indexer.as_str().to_string()]); - let telemetry_info = - QuickwitTelemetryInfo::new(services, HashSet::from_iter([QuickwitFeature::AwsLambda])); - let _telemetry_handle_opt = quickwit_telemetry::start_telemetry_loop(telemetry_info); - quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await; - start_actor_runtimes( - runtimes_config, - &HashSet::from_iter([QuickwitService::Indexer]), - )?; + Ok(()) +} + +pub(super) async fn spawn_services( + universe: &Universe, + cluster: Cluster, + metastore: MetastoreServiceClient, + storage_resolver: StorageResolver, + node_config: &NodeConfig, + runtime_config: RuntimesConfig, +) -> anyhow::Result<( + ActorHandle, + Option>, +)> { + let event_broker = EventBroker::default(); + + // spawn merge scheduler service let merge_scheduler_service = - MergeSchedulerService::new(indexer_config.merge_concurrency.get()); - let universe = Universe::new(); + MergeSchedulerService::new(node_config.indexer_config.merge_concurrency.get()); let (merge_scheduler_service_mailbox, _) = universe.spawn_builder().spawn(merge_scheduler_service); - let indexing_server = IndexingService::new( - config.node_id.clone(), - config.data_dir_path.clone(), - indexer_config, - runtimes_config.num_threads_blocking, + + // spawn indexer service + let indexing_service = IndexingService::new( + node_config.node_id.clone(), + node_config.data_dir_path.clone(), + node_config.indexer_config.clone(), + runtime_config.num_threads_blocking, cluster, - metastore, + metastore.clone(), None, - merge_scheduler_service_mailbox, + merge_scheduler_service_mailbox.clone(), IngesterPool::default(), - storage_resolver, - EventBroker::default(), + storage_resolver.clone(), + event_broker.clone(), ) .await?; - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_builder().spawn(indexing_server); + let (_, indexing_service_handle) = universe.spawn_builder().spawn(indexing_service); + + // spawn janitor service + let janitor_service_opt = if *DISABLE_JANITOR { + None + } else { + Some( + start_janitor_service( + universe, + node_config, + metastore, + SearchJobPlacer::default(), + storage_resolver, + event_broker, + true, + ) + .await?, + ) + }; + Ok((indexing_service_handle, janitor_service_opt)) +} + +pub(super) async fn spawn_pipelines( + indexing_server_mailbox: &Mailbox, + source_config: SourceConfig, +) -> anyhow::Result<(ActorHandle, ActorHandle)> { let pipeline_id = indexing_server_mailbox .ask_for_res(SpawnPipeline { index_id: INDEX_ID.clone(), @@ -234,24 +269,28 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { let indexing_pipeline_handle = indexing_server_mailbox .ask_for_res(DetachIndexingPipeline { pipeline_id }) .await?; - debug!("wait for indexing statistics"); - let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?; - debug!("indexing completed, tear down actors"); - merge_pipeline_handle.quit().await; - universe - .send_exit_with_success(&indexing_server_mailbox) - .await?; - indexing_server_handle.join().await; - universe.quit().await; - - if args.clear_cache { - info!("clearing local cache directory"); - clear_cache_directory(&config.data_dir_path).await?; - info!("local cache directory cleared"); - } + Ok((indexing_pipeline_handle, merge_pipeline_handle)) +} - if statistics.num_invalid_docs > 0 { - bail!("Failed to ingest {} documents", statistics.num_invalid_docs) +pub(super) async fn wait_for_merges( + merge_pipeline_handle: ActorHandle, +) -> anyhow::Result<()> { + // TODO: find a way to stop the MergePlanner actor in the MergePipeline, + // otherwise a new merge might be scheduled after this loop. That shouldn't + // have any concrete impact as the merge will be immediately cancelled, but + // it might generate errors during the universe shutdown (i.e "Failed to + // acquire permit") + loop { + let state = merge_pipeline_handle.state(); + let obs = merge_pipeline_handle.observe().await; + debug!(state=?state, ongoing=obs.num_ongoing_merges, "merge pipeline state"); + if obs.num_ongoing_merges == 0 { + break; + } + // We tolerate a relatively low refresh rate because the indexer + // typically runs for longuer periods of times and merges happen only + // occasionally. + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } - Ok(statistics) + Ok(()) } diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs new file mode 100644 index 00000000000..927fc684252 --- /dev/null +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -0,0 +1,119 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod helpers; + +use std::collections::HashSet; +use std::path::PathBuf; + +use anyhow::bail; +use helpers::{ + configure_source, create_empty_cluster, init_index_if_necessary, send_telemetry, + spawn_pipelines, spawn_services, +}; +use quickwit_actors::Universe; +use quickwit_cli::start_actor_runtimes; +use quickwit_cli::tool::start_statistics_reporting_loop; +use quickwit_common::runtimes::RuntimesConfig; +use quickwit_config::service::QuickwitService; +use quickwit_config::SourceInputFormat; +use quickwit_index_management::clear_cache_directory; +use quickwit_indexing::models::IndexingStatistics; +use tracing::{debug, info}; + +use crate::indexer::environment::CONFIGURATION_TEMPLATE; +use crate::indexer::ingest::helpers::wait_for_merges; +use crate::utils::load_node_config; + +#[derive(Debug, Eq, PartialEq)] +pub struct IngestArgs { + pub input_path: PathBuf, + pub input_format: SourceInputFormat, + pub overwrite: bool, + pub vrl_script: Option, + pub clear_cache: bool, +} + +pub async fn ingest(args: IngestArgs) -> anyhow::Result { + debug!(args=?args, "lambda-ingest"); + + send_telemetry().await; + + let (config, storage_resolver, mut metastore) = + load_node_config(CONFIGURATION_TEMPLATE).await?; + + let source_config = configure_source(args.input_path, args.input_format, args.vrl_script); + + init_index_if_necessary( + &mut metastore, + &storage_resolver, + &source_config, + &config.default_index_root_uri, + args.overwrite, + ) + .await?; + + let services = [QuickwitService::Indexer, QuickwitService::Janitor]; + let cluster = create_empty_cluster(&config, &services).await?; + let universe = Universe::new(); + let runtimes_config = RuntimesConfig::default(); + + start_actor_runtimes(runtimes_config, &HashSet::from_iter(services))?; + + let (indexing_service_handle, _janitor_service_guard) = spawn_services( + &universe, + cluster, + metastore.clone(), + storage_resolver.clone(), + &config, + runtimes_config, + ) + .await?; + + let (indexing_pipeline_handle, merge_pipeline_handle) = + spawn_pipelines(indexing_service_handle.mailbox(), source_config).await?; + + debug!("wait for indexing to complete"); + let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?; + + debug!("wait for merges to complete"); + wait_for_merges(merge_pipeline_handle).await?; + + debug!("indexing completed, tearing down actors"); + // TODO: is it really necessary to terminate the indexing service? + // Quitting the universe should be enough. + universe + .send_exit_with_success(indexing_service_handle.mailbox()) + .await?; + indexing_service_handle.join().await; + debug!("quitting universe"); + universe.quit().await; + debug!("universe.quit() awaited"); + + if args.clear_cache { + info!("clearing local cache directory"); + clear_cache_directory(&config.data_dir_path).await?; + info!("local cache directory cleared"); + } + + if statistics.num_invalid_docs > 0 { + bail!("Failed to ingest {} documents", statistics.num_invalid_docs) + } + Ok(statistics) +} diff --git a/quickwit/quickwit-lambda/src/lib.rs b/quickwit/quickwit-lambda/src/lib.rs index 0bb10d0cd83..3e5de5cb592 100644 --- a/quickwit/quickwit-lambda/src/lib.rs +++ b/quickwit/quickwit-lambda/src/lib.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +mod environment; pub mod indexer; pub mod logger; pub mod searcher; diff --git a/quickwit/quickwit-lambda/src/logger.rs b/quickwit/quickwit-lambda/src/logger.rs index 2a10c3ab6ed..632c0abc817 100644 --- a/quickwit/quickwit-lambda/src/logger.rs +++ b/quickwit/quickwit-lambda/src/logger.rs @@ -32,10 +32,12 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::{EnvFilter, Layer}; +use crate::environment::{LOG_SPAN_BOUNDARIES, OPENTELEMETRY_AUTHORIZATION, OPENTELEMETRY_URL}; + static TRACER_PROVIDER: OnceCell> = OnceCell::new(); pub(crate) const RUNTIME_CONTEXT_SPAN: &str = "runtime_context"; -fn fmt_layer(level: Level, ansi: bool) -> impl Layer +fn fmt_layer(level: Level) -> impl Layer where S: for<'a> LookupSpan<'a>, S: tracing::Subscriber, @@ -59,11 +61,16 @@ where ), ) .json(); + let fmt_span = if *LOG_SPAN_BOUNDARIES { + FmtSpan::NEW | FmtSpan::CLOSE + } else { + FmtSpan::NONE + }; tracing_subscriber::fmt::layer::() - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_span_events(fmt_span) .event_format(event_format) .fmt_fields(JsonFields::default()) - .with_ansi(ansi) + .with_ansi(false) .with_filter(env_filter) } @@ -111,36 +118,28 @@ where .with_filter(env_filter) } -fn setup_logging_and_tracing( - level: Level, - ansi: bool, - build_info: &BuildInfo, -) -> anyhow::Result<()> { +pub fn setup_lambda_tracer(level: Level) -> anyhow::Result<()> { global::set_text_map_propagator(TraceContextPropagator::new()); let registry = tracing_subscriber::registry(); - let otlp_config = ( - std::env::var("QW_LAMBDA_OPENTELEMETRY_URL"), - std::env::var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION"), - ); - if let (Ok(ot_url), Ok(ot_auth)) = otlp_config { + let build_info = BuildInfo::get(); + if let (Some(ot_url), Some(ot_auth)) = ( + OPENTELEMETRY_URL.clone(), + OPENTELEMETRY_AUTHORIZATION.clone(), + ) { registry - .with(fmt_layer(level, ansi)) + .with(fmt_layer(level)) .with(otlp_layer(ot_url, ot_auth, level, build_info)) .try_init() .context("Failed to set up tracing.")?; } else { registry - .with(fmt_layer(level, ansi)) + .with(fmt_layer(level)) .try_init() .context("Failed to set up tracing.")?; } Ok(()) } -pub fn setup_lambda_tracer() -> anyhow::Result<()> { - setup_logging_and_tracing(Level::INFO, false, BuildInfo::get()) -} - pub fn flush_tracer() { if let Some(Some(tracer_provider)) = TRACER_PROVIDER.get() { debug!("flush tracers"); diff --git a/quickwit/quickwit-lambda/src/searcher/environment.rs b/quickwit/quickwit-lambda/src/searcher/environment.rs index b995949dff9..2aee5752463 100644 --- a/quickwit/quickwit-lambda/src/searcher/environment.rs +++ b/quickwit/quickwit-lambda/src/searcher/environment.rs @@ -30,8 +30,5 @@ searcher: partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M} "; -pub(crate) static INDEX_ID: Lazy = - Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); - pub(crate) static DISABLE_SEARCH_CACHE: Lazy = Lazy::new(|| var("QW_LAMBDA_DISABLE_SEARCH_CACHE").is_ok_and(|v| v.as_str() == "true")); diff --git a/quickwit/quickwit-lambda/src/searcher/handler.rs b/quickwit/quickwit-lambda/src/searcher/handler.rs index e57de93bdac..b20e373e5bf 100644 --- a/quickwit/quickwit-lambda/src/searcher/handler.rs +++ b/quickwit/quickwit-lambda/src/searcher/handler.rs @@ -26,8 +26,9 @@ use quickwit_search::SearchResponseRest; use quickwit_serve::SearchRequestQueryString; use tracing::{debug_span, error, info_span, instrument, Instrument}; -use super::environment::{DISABLE_SEARCH_CACHE, INDEX_ID}; +use super::environment::DISABLE_SEARCH_CACHE; use super::search::{search, SearchArgs}; +use crate::environment::INDEX_ID; use crate::logger; use crate::utils::LambdaContainerContext; diff --git a/quickwit/quickwit-lambda/src/searcher/search.rs b/quickwit/quickwit-lambda/src/searcher/search.rs index 3d0e0d249f9..e03516fe8ef 100644 --- a/quickwit/quickwit-lambda/src/searcher/search.rs +++ b/quickwit/quickwit-lambda/src/searcher/search.rs @@ -35,7 +35,8 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme use tokio::sync::OnceCell; use tracing::debug; -use super::environment::{CONFIGURATION_TEMPLATE, DISABLE_SEARCH_CACHE, INDEX_ID}; +use super::environment::{CONFIGURATION_TEMPLATE, DISABLE_SEARCH_CACHE}; +use crate::environment::INDEX_ID; use crate::utils::load_node_config; static LAMBDA_SEARCH_CACHE: OnceCell = OnceCell::const_new();