Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix actor shutdown to await merges #4741

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ indexer-package-path:
searcher-package-path:
echo -n $(SEARCHER_PACKAGE_PATH)

bootstrap: package check-env
bootstrap:
cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION

deploy-hdfs: package check-env
Expand All @@ -65,17 +65,20 @@ deploy-hdfs: package check-env
deploy-mock-data: package check-env
cdk deploy -a cdk/app.py MockDataStack

print-mock-data-metastore: check-env
python -c 'from cdk import cli; cli.print_mock_data_metastore()'

# address https://github.com/aws/aws-cdk/issues/20060
before-destroy:
mkdir -p cdk.out
touch $(INDEXER_PACKAGE_PATH)
touch $(SEARCHER_PACKAGE_PATH)

destroy-hdfs: before-destroy
destroy-hdfs: before-destroy check-env
python -c 'from cdk import cli; cli.empty_hdfs_bucket()'
cdk destroy --force -a cdk/app.py HdfsStack

destroy-mock-data: before-destroy
destroy-mock-data: before-destroy check-env
python -c 'from cdk import cli; cli.empty_mock_data_buckets()'
cdk destroy --force -a cdk/app.py MockDataStack

Expand Down Expand Up @@ -108,6 +111,7 @@ bench-index:
done

bench-search-term:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -117,6 +121,7 @@ bench-search-term:
done

bench-search-histogram:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand Down
9 changes: 9 additions & 0 deletions distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ def empty_mock_data_buckets():
_clean_s3_bucket(bucket_name)


def print_mock_data_metastore():
bucket_name = _get_cloudformation_output_value(
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
s3 = session.client("s3")
response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
print(response["Body"].read().decode())


@cache
def _git_commit():
return subprocess.run(
Expand Down
11 changes: 7 additions & 4 deletions distribution/lambda/cdk/stacks/examples/mock_data_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from constructs import Construct
import yaml

from ..services.quickwit_service import QuickwitService
from ..services import quickwit_service

SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
Expand All @@ -28,7 +28,7 @@ def __init__(
scope: Construct,
construct_id: str,
index_id: str,
qw_svc: QuickwitService,
qw_svc: quickwit_service.QuickwitService,
**kwargs,
):
super().__init__(scope, construct_id, **kwargs)
Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(
scope: Construct,
construct_id: str,
index_id: str,
qw_svc: QuickwitService,
qw_svc: quickwit_service.QuickwitService,
api_key: str,
**kwargs,
) -> None:
Expand Down Expand Up @@ -149,12 +149,15 @@ def __init__(
"mock-data-index-config",
path=index_config_local_path,
)
qw_svc = QuickwitService(
lambda_env = quickwit_service.extract_local_env()
qw_svc = quickwit_service.QuickwitService(
self,
"Quickwit",
index_id=index_id,
index_config_bucket=index_config.s3_bucket_name,
index_config_key=index_config.s3_object_key,
indexer_environment=lambda_env,
searcher_environment=lambda_env,
indexer_package_location=indexer_package_location,
searcher_package_location=searcher_package_location,
)
Expand Down
4 changes: 3 additions & 1 deletion distribution/lambda/cdk/stacks/services/indexer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def __init__(
"QW_LAMBDA_INDEX_CONFIG_URI": f"s3://{index_config_bucket}/{index_config_key}",
**environment,
},
timeout=aws_cdk.Duration.minutes(15),
# use a strict timeout and retry policy to avoid unexpected costs
timeout=aws_cdk.Duration.minutes(1),
retry_attempts=0,
reserved_concurrent_executions=1,
memory_size=memory_size,
ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
Expand Down
8 changes: 6 additions & 2 deletions distribution/lambda/cdk/stacks/services/quickwit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@


def extract_local_env() -> dict[str, str]:
"""Extracts local environment variables that start with QW_LAMBDA_"""
return {k: os.environ[k] for k in os.environ.keys() if k.startswith("QW_LAMBDA_")}
"""Extracts local environment variables QW_LAMBDA_* and QW_DISABLE_TELEMETRY"""
return {
k: os.environ[k]
for k in os.environ.keys()
if (k.startswith("QW_LAMBDA_") or k == "QW_DISABLE_TELEMETRY")
rdettai marked this conversation as resolved.
Show resolved Hide resolved
}


class QuickwitService(Construct):
Expand Down
1,040 changes: 525 additions & 515 deletions distribution/lambda/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion distribution/lambda/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = [{include = "cdk"}]
[tool.poetry.dependencies]
python = ">=3.10,<4.0"
aws-cdk-lib = "^2.95.1"
cargo-lambda = "^0.21.0"
cargo-lambda = "^1.1.0"
constructs = ">=10.0.0,<11.0.0"
pyyaml = "^6.0.1"
black = "^24.3.0"
Expand Down
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ quickwit-doc-mapper = { workspace = true }
quickwit-index-management = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-janitor = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-rest-client = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-lambda/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quickwit_lambda::logger;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
logger::setup_lambda_tracer()?;
logger::setup_lambda_tracer(tracing::Level::INFO)?;
let func = service_fn(handler);
lambda_runtime::run(func)
.await
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-lambda/src/bin/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quickwit_lambda::searcher::handler;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
logger::setup_lambda_tracer()?;
logger::setup_lambda_tracer(tracing::Level::INFO)?;
let func = service_fn(handler);
run(func).await.map_err(|e| anyhow::anyhow!(e))
}
36 changes: 36 additions & 0 deletions quickwit/quickwit-lambda/src/environment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::env::var;

use once_cell::sync::Lazy;

pub static INDEX_ID: Lazy<String> =
Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set"));

/// Configure the fmt tracing subscriber to log span boundaries. This is very verbose and is
/// only used to generate advanced KPIs from Lambda runs (e.g for blogpost benchmarks)
pub static LOG_SPAN_BOUNDARIES: Lazy<bool> =
rdettai marked this conversation as resolved.
Show resolved Hide resolved
Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true"));

pub static OPENTELEMETRY_URL: Lazy<Option<String>> =
Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_URL").ok());

pub static OPENTELEMETRY_AUTHORIZATION: Lazy<Option<String>> =
Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION").ok());
7 changes: 4 additions & 3 deletions quickwit/quickwit-lambda/src/indexer/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use once_cell::sync::Lazy;

pub const CONFIGURATION_TEMPLATE: &str = "version: 0.6
node_id: lambda-indexer
cluster_id: lambda-ephemeral
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
data_dir: /tmp
Expand All @@ -32,8 +33,8 @@ pub static INDEX_CONFIG_URI: Lazy<String> = Lazy::new(|| {
var("QW_LAMBDA_INDEX_CONFIG_URI").expect("QW_LAMBDA_INDEX_CONFIG_URI must be set")
});

pub static INDEX_ID: Lazy<String> =
Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set"));

pub static DISABLE_MERGE: Lazy<bool> =
Lazy::new(|| var("QW_LAMBDA_DISABLE_MERGE").is_ok_and(|v| v.as_str() == "true"));

pub static DISABLE_JANITOR: Lazy<bool> =
Lazy::new(|| var("QW_LAMBDA_DISABLE_JANITOR").is_ok_and(|v| v.as_str() == "true"));
6 changes: 5 additions & 1 deletion quickwit/quickwit-lambda/src/indexer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use lambda_runtime::{Error, LambdaEvent};
use serde_json::Value;
use tracing::{debug_span, error, info, info_span, Instrument};

use super::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID};
use super::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI};
use super::ingest::{ingest, IngestArgs};
use super::model::IndexerEvent;
use crate::environment::INDEX_ID;
use crate::logger;
use crate::utils::LambdaContainerContext;

Expand All @@ -37,6 +38,8 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: false,
vrl_script: None,
// TODO: instead of clearing the cache, we use a cache and set its max
// size with indexer_config.split_store_max_num_bytes
clear_cache: true,
})
.instrument(debug_span!(
Expand All @@ -45,6 +48,7 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
env.INDEX_CONFIG_URI = *INDEX_CONFIG_URI,
env.INDEX_ID = *INDEX_ID,
env.DISABLE_MERGE = *DISABLE_MERGE,
env.DISABLE_JANITOR = *DISABLE_JANITOR,
cold = container_ctx.cold,
container_id = container_ctx.container_id,
))
Expand Down
Loading
Loading