Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lambda search endpoints using a warp adapter #4805

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ bench-index:
done

bench-search-term:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -121,7 +121,7 @@ bench-search-term:
done

bench-search-histogram:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -133,15 +133,13 @@ bench-search-histogram:
bench-search:
for run in {1..30}
do
export QW_LAMBDA_DISABLE_SEARCH_CACHE=true
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
export QW_LAMBDA_DISABLE_SEARCH_CACHE=false
export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB
$(MAKE) bench-search-term
$(MAKE) bench-search-histogram
done

test-mock-data-endpoints:
python -c 'from cdk import cli; cli.test_mock_data_endpoints()'
107 changes: 87 additions & 20 deletions distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dataclasses import dataclass
from functools import cache
from io import BytesIO
from urllib.parse import urlparse

import boto3
import botocore.config
Expand All @@ -29,6 +30,8 @@
retries={"max_attempts": 0}, read_timeout=60 * 15
)
session = boto3.Session(region_name=region)
mock_sales_index_id = "mock-sales"
hdfs_logs_index_id = "hdfs-logs"


@cache
Expand All @@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str:
print(f"Stack {stack_name} not identified uniquely, found {stacks}")
outputs = stacks[0]["Outputs"]
for output in outputs:
if output["ExportName"] == export_name:
if "ExportName" in output and output["ExportName"] == export_name:
return output["OutputValue"]
else:
print(f"Export name {export_name} not found in stack {stack_name}")
exit(1)


def _decompress_if_gzip(payload: bytes, headers: dict) -> str:
if headers.get("content-encoding", "") == "gzip":
return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode()
else:
return payload.decode()


@dataclass
class LambdaResult:
function_error: str
log_tail: str
payload: str
raw_size_bytes: int
status_code: int

@staticmethod
def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
Expand All @@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(payload),
status_code=0,
)

@staticmethod
def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult":
gw_str = lambda_resp["Payload"].read().decode()
gw_obj = json.loads(gw_str)
payload = gw_obj["body"]
if gw_obj["isBase64Encoded"]:
if "body" in gw_obj:
payload = gw_obj["body"]
status_code = gw_obj["statusCode"]
else:
payload = gw_str
status_code = -1
if gw_obj.get("isBase64Encoded", False):
dec_payload = base64.b64decode(payload)
if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip":
payload = (
gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload))
.read()
.decode()
)
else:
payload = dec_payload.decode()
payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {}))
return LambdaResult(
function_error=lambda_resp.get("FunctionError", ""),
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
payload=payload,
raw_size_bytes=len(gw_str),
status_code=status_code,
)

def extract_report(self) -> str:
Expand All @@ -108,12 +119,13 @@ def _format_lambda_output(
if lambda_result.function_error != "":
print("\n## FUNCTION ERROR:")
print(lambda_result.function_error)
print("\n## LOG TAIL:")
print(lambda_result.log_tail)
print("\n## RAW RESPONSE SIZE (BYTES):")
ratio = lambda_result.raw_size_bytes / len(lambda_result.payload)
print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)")
print("\n## RESPONSE:")
if len(lambda_result.payload) == 0:
ratio = "empty payload"
else:
ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload"
print(f"{lambda_result.raw_size_bytes} ({ratio})")
print(f"\n## RESPONSE [{lambda_result.status_code}]:")
payload_size = len(lambda_result.payload)
print(lambda_result.payload[:max_resp_size])
if payload_size > max_resp_size:
Expand Down Expand Up @@ -184,6 +196,7 @@ def invoke_hdfs_indexer() -> LambdaResult:

def _invoke_searcher(
stack_name: str,
index_id: str,
function_export_name: str,
payload: str,
download_logs: bool,
Expand All @@ -198,9 +211,14 @@ def _invoke_searcher(
LogType="Tail",
Payload=json.dumps(
{
"headers": {"Content-Type": "application/json"},
"resource": f"/api/v1/{index_id}/search",
"path": f"/api/v1/{index_id}/search",
"httpMethod": "POST",
"headers": {
"Content-Type": "application/json",
},
"requestContext": {
"http": {"method": "POST"},
"httpMethod": "POST",
},
"body": payload,
"isBase64Encoded": False,
Expand All @@ -218,6 +236,7 @@ def _invoke_searcher(
def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult:
return _invoke_searcher(
app.HDFS_STACK_NAME,
hdfs_logs_index_id,
hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
payload,
download_logs,
Expand Down Expand Up @@ -249,7 +268,6 @@ def get_logs(
last_event_id = event["eventId"]
yield event["message"]
if event["message"].startswith("REPORT"):
print(event["message"])
lower_time_bound = int(event["timestamp"])
last_event_id = "REPORT"
break
Expand Down Expand Up @@ -277,13 +295,15 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
int(invoke_start * 1000),
):
f.write(log)
print(f"Logs written to lambda.{request_id}.log")
except Exception as e:
print(f"Failed to download logs: {e}")


def invoke_mock_data_searcher():
_invoke_searcher(
app.MOCK_DATA_STACK_NAME,
mock_sales_index_id,
mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME,
"""{"query": "id:1", "sort_by": "ts", "max_hits": 10}""",
True,
Expand Down Expand Up @@ -321,7 +341,9 @@ def print_mock_data_metastore():
app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
s3 = session.client("s3")
response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json")
response = s3.get_object(
Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json"
)
print(response["Body"].read().decode())


Expand Down Expand Up @@ -387,3 +409,48 @@ def benchmark_hdfs_search(payload: str):
with open(f"lambda-bench.log", "a+") as f:
f.write(json.dumps(bench_result))
f.write("\n")


def test_mock_data_endpoints():
apigw_url = _get_cloudformation_output_value(
app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME
)

def req(method, path, body=None, expected_status=200):
conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc)
conn.request(
method,
path,
body,
headers={"x-api-key": os.getenv("SEARCHER_API_KEY")},
)
response = conn.getresponse()
print(f"{method} {path}")
headers = {k: v for (k, v) in response.getheaders()}
body = _decompress_if_gzip(response.read(), headers)
if response.status != expected_status:
print(f"[{response.status}] => {body}")
exit(1)
else:
print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}")

req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal")
req(
"POST",
f"/api/v1/{mock_sales_index_id}/search",
'{"query":"quantity:>5", "max_hits": 10}',
)
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal")
req(
"POST",
f"/api/v1/_elastic/{mock_sales_index_id}/_search",
'{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}',
)
req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity")
# expected errors
req(
"GET",
f"/api/v1/_elastic/{mock_sales_index_id}/_search?query=animal",
expected_status=400,
)
req("GET", f"/api/v1/_elastic/_search?q=animal", expected_status=501)
15 changes: 10 additions & 5 deletions distribution/lambda/cdk/stacks/examples/mock_data_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name"
API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url"


class Source(Construct):
Expand Down Expand Up @@ -98,11 +99,12 @@ def __init__(
searcher_integration = aws_apigateway.LambdaIntegration(
qw_svc.searcher.lambda_function
)
search_resource = (
api.root.add_resource("v1").add_resource(index_id).add_resource("search")
)
search_resource = api.root.add_resource("v1").add_resource("{proxy+}")
search_resource.add_method("POST", searcher_integration, api_key_required=True)
api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api)
search_resource.add_method("GET", searcher_integration, api_key_required=True)
# Change the deployment id (api-deployment-x) each time the API changes,
# otherwise changes are not deployed.
api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api)
api_stage = aws_apigateway.Stage(
self, "api", deployment=api_deployment, stage_name="api"
)
Expand All @@ -122,7 +124,10 @@ def __init__(
api.deployment_stage = api_stage

aws_cdk.CfnOutput(
self, "search-api-url", value=api.url.rstrip("/") + search_resource.path
self,
"search-api-url",
value=api.url.rstrip("/") + search_resource.path,
export_name=API_GATEWAY_EXPORT_NAME,
)


Expand Down
2 changes: 1 addition & 1 deletion distribution/lambda/resources/hdfs-logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Index config file for hdfs-logs dataset.
#

version: 0.6
version: 0.7

index_id: hdfs-logs

Expand Down
2 changes: 1 addition & 1 deletion distribution/lambda/resources/mock-sales.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Index config file for mock-sales data generator.
#

version: 0.6
version: 0.7

index_id: mock-sales

Expand Down
Loading
Loading