From ef46493c31abf774751b52d9d72f751791d13415 Mon Sep 17 00:00:00 2001 From: saartochner Date: Sun, 22 Jan 2023 10:46:11 +0200 Subject: [PATCH 1/4] move the tracer-specific logic into a well-marked module and don't run code in global time if not lambda --- .secrets.baseline | 8 +- src/lumigo_tracer/__init__.py | 5 +- src/lumigo_tracer/auto_instrument_handler.py | 19 +- src/lumigo_tracer/event/event_trigger.py | 8 +- src/lumigo_tracer/extension/extension.py | 3 +- src/lumigo_tracer/lambda_tracer/__init__.py | 0 .../lambda_tracer/global_scope_exec.py | 14 + .../lambda_tracer/lambda_reporter.py | 293 ++++++++++++++++++ .../{ => lambda_tracer}/spans_container.py | 21 +- .../{ => lambda_tracer}/tracer.py | 4 +- src/lumigo_tracer/lumigo_utils.py | 270 +--------------- src/lumigo_tracer/user_utils.py | 2 +- src/lumigo_tracer/wrappers/__init__.py | 6 +- .../wrappers/aiohttp/aiohttp_wrapper.py | 2 +- .../wrappers/http/sync_http_wrappers.py | 4 +- .../wrappers/pymongo/pymongo_wrapper.py | 2 +- .../wrappers/redis/redis_wrapper.py | 2 +- .../wrappers/sql/sqlalchemy_wrapper.py | 2 +- src/test/component/test_component.py | 6 +- src/test/conftest.py | 16 +- src/test/unit/auto_tag/test_auto_tag_event.py | 2 +- .../lambda_tracer/test_global_scope_exec.py | 18 ++ .../lambda_tracer/test_lambda_reporter.py | 234 ++++++++++++++ .../test_spans_container.py | 10 +- .../unit/{ => lambda_tracer}/test_tracer.py | 17 +- src/test/unit/test_lumigo_utils.py | 225 +------------- src/test/unit/test_user_utils.py | 2 +- .../wrappers/aiohttp/test_aiohttp_wrapper.py | 2 +- src/test/unit/wrappers/conftest.py | 8 + .../wrappers/http/test_sync_http_wrappers.py | 2 +- .../http/test_sync_http_wrappers_threads.py | 2 +- .../wrappers/pymongo/test_pymongo_wrapper.py | 2 +- .../unit/wrappers/redis/test_redis_wrapper.py | 2 +- .../wrappers/sql/test_sqlalchemy_wrapper.py | 4 +- 34 files changed, 660 insertions(+), 557 deletions(-) create mode 100644 src/lumigo_tracer/lambda_tracer/__init__.py create mode 100644 src/lumigo_tracer/lambda_tracer/global_scope_exec.py create mode 100644 src/lumigo_tracer/lambda_tracer/lambda_reporter.py rename src/lumigo_tracer/{ => lambda_tracer}/spans_container.py (96%) rename src/lumigo_tracer/{ => lambda_tracer}/tracer.py (97%) create mode 100644 src/test/unit/lambda_tracer/test_global_scope_exec.py create mode 100644 src/test/unit/lambda_tracer/test_lambda_reporter.py rename src/test/unit/{ => lambda_tracer}/test_spans_container.py (96%) rename src/test/unit/{ => lambda_tracer}/test_tracer.py (96%) create mode 100644 src/test/unit/wrappers/conftest.py diff --git a/.secrets.baseline b/.secrets.baseline index d49b2440..35cbefeb 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -115,14 +115,14 @@ "filename": "src/lumigo_tracer/lumigo_utils.py", "hashed_secret": "116b0dcbb1dfb3f19c902ebfc29f35d908cec6a4", "is_verified": false, - "line_number": 69 + "line_number": 48 }, { "type": "Secret Keyword", "filename": "src/lumigo_tracer/lumigo_utils.py", "hashed_secret": "2a93870f6e6a2158cb9631b349e75d8f65fd81af", "is_verified": false, - "line_number": 70 + "line_number": 49 } ], "src/test/unit/event/test_event_dumper.py": [ @@ -184,7 +184,7 @@ "filename": "src/test/unit/test_lumigo_utils.py", "hashed_secret": "f32b67c7e26342af42efabc674d441dca0a281c5", "is_verified": false, - "line_number": 263 + "line_number": 199 } ], "src/test/unit/test_tracer.py": [ @@ -220,5 +220,5 @@ } ] }, - "generated_at": "2022-11-08T16:26:37Z" + "generated_at": "2023-01-22T08:40:16Z" } diff --git a/src/lumigo_tracer/__init__.py b/src/lumigo_tracer/__init__.py index 18a6c87e..f12b9bd5 100644 --- a/src/lumigo_tracer/__init__.py +++ b/src/lumigo_tracer/__init__.py @@ -1,4 +1,4 @@ -from .tracer import lumigo_tracer, LumigoChalice # noqa +from lumigo_tracer.lambda_tracer.tracer import lumigo_tracer, LumigoChalice # noqa from .user_utils import ( # noqa report_error, add_execution_tag, @@ -9,3 +9,6 @@ error, ) from .auto_instrument_handler import _handler # noqa +from .lambda_tracer.global_scope_exec import global_scope_exec + +global_scope_exec() diff --git a/src/lumigo_tracer/auto_instrument_handler.py b/src/lumigo_tracer/auto_instrument_handler.py index d4f9c868..f4764a64 100644 --- a/src/lumigo_tracer/auto_instrument_handler.py +++ b/src/lumigo_tracer/auto_instrument_handler.py @@ -1,5 +1,7 @@ import os +from lumigo_tracer.lumigo_utils import is_aws_environment + try: # Try to import AWS's current _get_handler logic from bootstrap import _get_handler as aws_get_handler @@ -27,8 +29,15 @@ def _handler(*args, **kwargs): # type: ignore[no-untyped-def] return original_handler(*args, **kwargs) -try: - # import handler during runtime initialization, as usual. - get_original_handler() -except Exception: - pass +def prefetch_handler_import() -> None: + """ + This function imports the handler. + When we call it in the global scope, it will be executed during the lambda initialization, + thus will mimic the usual behavior. + """ + if not is_aws_environment(): + return + try: + get_original_handler() + except Exception: + pass diff --git a/src/lumigo_tracer/event/event_trigger.py b/src/lumigo_tracer/event/event_trigger.py index 655aee2c..26d0fde8 100644 --- a/src/lumigo_tracer/event/event_trigger.py +++ b/src/lumigo_tracer/event/event_trigger.py @@ -6,8 +6,8 @@ from lumigo_tracer.lumigo_utils import Configuration, get_logger -def _recursive_parse_trigger_by( - message: Dict[Any, Any], parent_id: Optional[str], level: int +def recursive_parse_trigger( + message: Dict[Any, Any], parent_id: Optional[str] = None, level: int = 0 ) -> List[TriggerType]: triggers = [] if level >= Configuration.chained_services_max_depth: @@ -27,7 +27,7 @@ def _recursive_parse_trigger_by( if INNER_MESSAGES_MAGIC_PATTERN.search(sub_message): # We want to load only relevant messages, so first run a quick scan triggers.extend( - _recursive_parse_trigger_by( + recursive_parse_trigger( json.loads(sub_message), parent_id=current_trigger_id, level=level + 1 ) ) @@ -36,4 +36,4 @@ def _recursive_parse_trigger_by( def parse_triggers(event: Dict[Any, Any]) -> List[Dict[Any, Any]]: - return _recursive_parse_trigger_by(event, parent_id=None, level=0) + return recursive_parse_trigger(event, parent_id=None, level=0) diff --git a/src/lumigo_tracer/extension/extension.py b/src/lumigo_tracer/extension/extension.py index 3e64c061..efa0f5d6 100644 --- a/src/lumigo_tracer/extension/extension.py +++ b/src/lumigo_tracer/extension/extension.py @@ -8,6 +8,7 @@ from lumigo_tracer import lumigo_utils from lumigo_tracer.extension.lambda_service import LambdaService from lumigo_tracer.extension.sampler import Sampler +from lumigo_tracer.lambda_tracer import lambda_reporter from lumigo_tracer.lumigo_utils import lumigo_safe_execute SPAN_TYPE = "extensionExecutionEnd" @@ -74,4 +75,4 @@ def _finish_previous_invocation(self, current_bandwidth: Optional[int]): # type cpuUsageTime=[s.dump() for s in self.sampler.get_cpu_samples()], memoryUsage=[s.dump() for s in self.sampler.get_memory_samples()], ) - lumigo_utils.report_json(os.environ.get("AWS_REGION", "us-east-1"), msgs=[asdict(span)]) + lambda_reporter.report_json(os.environ.get("AWS_REGION", "us-east-1"), msgs=[asdict(span)]) diff --git a/src/lumigo_tracer/lambda_tracer/__init__.py b/src/lumigo_tracer/lambda_tracer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/lumigo_tracer/lambda_tracer/global_scope_exec.py b/src/lumigo_tracer/lambda_tracer/global_scope_exec.py new file mode 100644 index 00000000..6719142c --- /dev/null +++ b/src/lumigo_tracer/lambda_tracer/global_scope_exec.py @@ -0,0 +1,14 @@ +from lumigo_tracer import auto_instrument_handler +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer.lumigo_utils import is_aws_environment +from lumigo_tracer.wrappers import wrap + + +def global_scope_exec() -> None: + if is_aws_environment(): + # Connection to edge: build the session + lambda_reporter.establish_connection_global() + # auto_instrument: import handler during runtime initialization, as usual. + auto_instrument_handler.prefetch_handler_import() + # follow requests to third party services + wrap() diff --git a/src/lumigo_tracer/lambda_tracer/lambda_reporter.py b/src/lumigo_tracer/lambda_tracer/lambda_reporter.py new file mode 100644 index 00000000..ac025866 --- /dev/null +++ b/src/lumigo_tracer/lambda_tracer/lambda_reporter.py @@ -0,0 +1,293 @@ +import os +import uuid +import time +import random +import socket +import datetime +import http.client +from pathlib import Path +from typing import Optional, List, Dict, Any, Union +from base64 import b64encode +from functools import lru_cache + +from lumigo_tracer.lumigo_utils import ( + InternalState, + get_logger, + Configuration, + EDGE_HOST, + aws_dump, + internal_analytics_message, + lumigo_safe_execute, + warn_client, + should_use_tracer_extension, + is_span_has_error, + get_region, +) + +try: + import botocore + import boto3 +except Exception: + botocore = None + boto3 = None + +EDGE_PATH = "/api/spans" +HTTPS_PREFIX = "https://" +SECONDS_TO_TIMEOUT = 0.5 +EDGE_TIMEOUT = float(os.environ.get("LUMIGO_EDGE_TIMEOUT", SECONDS_TO_TIMEOUT)) +MAX_SIZE_FOR_REQUEST: int = int(os.environ.get("LUMIGO_MAX_SIZE_FOR_REQUEST", 1024 * 500)) +MAX_NUMBER_OF_SPANS: int = int(os.environ.get("LUMIGO_MAX_NUMBER_OF_SPANS", 2000)) +TOO_BIG_SPANS_THRESHOLD = 5 +NUMBER_OF_SPANS_IN_REPORT_OPTIMIZATION = 200 +COOLDOWN_AFTER_TIMEOUT_DURATION = datetime.timedelta(seconds=10) +CHINA_REGION = "cn-northwest-1" +LUMIGO_SPANS_DIR = "/tmp/lumigo-spans" + +edge_kinesis_boto_client = None +edge_connection = None + + +def establish_connection_global() -> None: + global edge_connection + try: + # Try to establish the connection in initialization + if ( + os.environ.get("LUMIGO_INITIALIZATION_CONNECTION", "").lower() != "false" + and get_region() != CHINA_REGION # noqa + ): + edge_connection = establish_connection() + if edge_connection: + edge_connection.connect() + except socket.timeout: + InternalState.mark_timeout_to_edge() + except Exception: + pass + + +def should_report_to_edge() -> bool: + if not InternalState.timeout_on_connection: + return True + time_diff = datetime.datetime.now() - InternalState.timeout_on_connection + return time_diff > COOLDOWN_AFTER_TIMEOUT_DURATION + + +def establish_connection(host: Optional[str] = None) -> Optional[http.client.HTTPSConnection]: + try: + if not host: + host = get_edge_host(os.environ.get("AWS_REGION")) + return http.client.HTTPSConnection(host, timeout=EDGE_TIMEOUT) + except Exception as e: + get_logger().exception(f"Could not establish connection to {host}", exc_info=e) + return None + + +@lru_cache(maxsize=1) +def get_edge_host(region: Optional[str] = None) -> str: + host = Configuration.host or EDGE_HOST.format(region=region or get_region()) + if host.startswith(HTTPS_PREFIX): + host = host[len(HTTPS_PREFIX) :] # noqa: E203 + if host.endswith(EDGE_PATH): + host = host[: -len(EDGE_PATH)] + return host + + +def report_json( + region: Optional[str], + msgs: List[Dict[Any, Any]], + should_retry: bool = True, + is_start_span: bool = False, +) -> int: + """ + This function sends the information back to the edge. + + :param region: The region to use as default if not configured otherwise. + :param msgs: the message to send. + :param should_retry: False to disable the default retry on unsuccessful sending + :param is_start_span: a flag to indicate if this is the start_span + of spans that will be written + :return: The duration of reporting (in milliseconds), + or 0 if we didn't send (due to configuration or fail). + """ + if not should_report_to_edge(): + get_logger().info("Skip sending messages due to previous timeout") + return 0 + if not Configuration.should_report: + return 0 + get_logger().info(f"reporting the messages: {msgs[:10]}") + try: + prune_trace: bool = not os.environ.get("LUMIGO_PRUNE_TRACE_OFF", "").lower() == "true" + to_send = _create_request_body(msgs, prune_trace).encode() + except Exception as e: + get_logger().exception("Failed to create request: A span was lost.", exc_info=e) + return 0 + if should_use_tracer_extension(): + with lumigo_safe_execute("report json file: writing spans to file"): + write_spans_to_files(spans=msgs, is_start_span=is_start_span) + return 0 + if region == CHINA_REGION: + return _publish_spans_to_kinesis(to_send, CHINA_REGION) + host = None + global edge_connection + with lumigo_safe_execute("report json: establish connection"): + host = get_edge_host(region) + duration = 0 + if not edge_connection or edge_connection.host != host: + edge_connection = establish_connection(host) + if not edge_connection: + get_logger().warning("Can not establish connection. Skip sending span.") + return duration + try: + start_time = time.time() + edge_connection.request( + "POST", + EDGE_PATH, + to_send, + headers={ + "Content-Type": "application/json", + "Authorization": Configuration.token or "", + }, + ) + response = edge_connection.getresponse() + response.read() # We must read the response to keep the connection available + duration = int((time.time() - start_time) * 1000) + get_logger().info(f"successful reporting, code: {getattr(response, 'code', 'unknown')}") + except socket.timeout: + get_logger().exception(f"Timeout while connecting to {host}") + InternalState.mark_timeout_to_edge() + internal_analytics_message("report: socket.timeout") + except Exception as e: + if should_retry: + get_logger().info(f"Could not report to {host}: ({str(e)}). Retrying.") + edge_connection = establish_connection(host) + report_json(region, msgs, should_retry=False) + else: + get_logger().exception("Could not report: A span was lost.", exc_info=e) + internal_analytics_message(f"report: {type(e)}") + return duration + + +def _create_request_body( + msgs: List[dict], # type: ignore[type-arg] + prune_size_flag: bool, + max_size: int = MAX_SIZE_FOR_REQUEST, + too_big_spans_threshold: int = TOO_BIG_SPANS_THRESHOLD, +) -> str: + + if not prune_size_flag or ( + len(msgs) < NUMBER_OF_SPANS_IN_REPORT_OPTIMIZATION + and _get_event_base64_size(msgs) < max_size # noqa + ): + return aws_dump(msgs)[:max_size] + + end_span = msgs[-1] + ordered_spans = sorted(msgs[:-1], key=is_span_has_error, reverse=True) + + spans_to_send: list = [end_span] # type: ignore[type-arg] + current_size = _get_event_base64_size(end_span) + too_big_spans = 0 + for span in ordered_spans: + span_size = _get_event_base64_size(span) + if current_size + span_size < max_size: + spans_to_send.append(span) + current_size += span_size + else: + # This is an optimization step. If the spans are too big, don't try to send them. + too_big_spans += 1 + if too_big_spans == too_big_spans_threshold: + break + return aws_dump(spans_to_send)[:max_size] + + +def write_spans_to_files( + spans: List[Dict[Any, Any]], max_spans: int = MAX_NUMBER_OF_SPANS, is_start_span: bool = True +) -> None: + to_send = spans[:max_spans] + if is_start_span: + get_logger().info("Creating start span file") + write_extension_file(to_send, "span") + else: + get_logger().info("Creating end span file") + write_extension_file(to_send, "end") + + +def write_extension_file(data: List[Dict], span_type: str): # type: ignore[no-untyped-def,type-arg] + Path(get_extension_dir()).mkdir(parents=True, exist_ok=True) + to_send = aws_dump(data).encode() + file_path = get_span_file_name(span_type) + with open(file_path, "wb") as span_file: + span_file.write(to_send) + get_logger().info(f"Wrote span to file to [{file_path}][{len(to_send)}]") + + +def get_extension_dir() -> str: + return (os.environ.get("LUMIGO_EXTENSION_SPANS_DIR_KEY") or LUMIGO_SPANS_DIR).lower() + + +def get_span_file_name(span_type: str): # type: ignore[no-untyped-def] + unique_name = str(uuid.uuid4()) + return os.path.join(get_extension_dir(), f"{unique_name}_{span_type}") + + +def _publish_spans_to_kinesis(to_send: bytes, region: str) -> int: + start_time = time.time() + try: + get_logger().info("Sending spans to Kinesis") + if not Configuration.edge_kinesis_aws_access_key_id: + get_logger().error("Missing edge_kinesis_aws_access_key_id, can't publish the spans") + return 0 + if not Configuration.edge_kinesis_aws_secret_access_key: + get_logger().error( + "Missing edge_kinesis_aws_secret_access_key, can't publish the spans" + ) + return 0 + _send_data_to_kinesis( + stream_name=Configuration.edge_kinesis_stream_name, + to_send=to_send, + region=region, + aws_access_key_id=Configuration.edge_kinesis_aws_access_key_id, + aws_secret_access_key=Configuration.edge_kinesis_aws_secret_access_key, + ) + except Exception as err: + get_logger().exception("Failed to send spans to Kinesis", exc_info=err) + warn_client(f"Failed to send spans to Kinesis: {err}") + return int((time.time() - start_time) * 1000) + + +def _send_data_to_kinesis( # type: ignore[no-untyped-def] + stream_name: str, + to_send: bytes, + region: str, + aws_access_key_id: str, + aws_secret_access_key: str, +): + if not boto3: + get_logger().error("boto3 is missing. Unable to send to Kinesis.") + return None + client = _get_edge_kinesis_boto_client( + region=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + client.put_record(Data=to_send, StreamName=stream_name, PartitionKey=str(random.random())) + get_logger().info("Successful sending to Kinesis") + + +def _get_edge_kinesis_boto_client(region: str, aws_access_key_id: str, aws_secret_access_key: str): # type: ignore[no-untyped-def] + global edge_kinesis_boto_client + if not edge_kinesis_boto_client or _is_edge_kinesis_connection_cache_disabled(): + edge_kinesis_boto_client = boto3.client( + "kinesis", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + config=botocore.config.Config(retries={"max_attempts": 1, "mode": "standard"}), + ) + return edge_kinesis_boto_client + + +def _is_edge_kinesis_connection_cache_disabled() -> bool: + return os.environ.get("LUMIGO_KINESIS_SHOULD_REUSE_CONNECTION", "").lower() == "false" + + +def _get_event_base64_size(event: Union[Dict[Any, Any], List[Dict[Any, Any]]]) -> int: + return len(b64encode(aws_dump(event).encode())) diff --git a/src/lumigo_tracer/spans_container.py b/src/lumigo_tracer/lambda_tracer/spans_container.py similarity index 96% rename from src/lumigo_tracer/spans_container.py rename to src/lumigo_tracer/lambda_tracer/spans_container.py index 4fdcc791..0c5725dc 100644 --- a/src/lumigo_tracer/spans_container.py +++ b/src/lumigo_tracer/lambda_tracer/spans_container.py @@ -16,25 +16,24 @@ EXECUTION_TAGS_KEY, get_timeout_buffer, get_logger, - _is_span_has_error, + is_span_has_error, create_step_function_span, get_current_ms_time, get_region, is_provision_concurrency_initialization, get_stacktrace, - write_extension_file, should_use_tracer_extension, MANUAL_TRACES_KEY, lumigo_safe_execute, is_python_37, ) -from lumigo_tracer import lumigo_utils +from lumigo_tracer.lambda_tracer import lambda_reporter from lumigo_tracer.event.event_dumper import EventDumper from lumigo_tracer.w3c_context import add_w3c_trace_propagator from lumigo_tracer.event.event_trigger import parse_triggers from lumigo_tracer.parsing_utils import parse_trace_id, safe_split_get, recursive_json_join -_VERSION_PATH = os.path.join(os.path.dirname(__file__), "VERSION") +_VERSION_PATH = os.path.join(os.path.dirname(__file__), "../VERSION") MAX_LAMBDA_TIME = 15 * 60 * 1000 FUNCTION_TYPE = "function" ENRICHMENT_TYPE = "enrichment" @@ -140,7 +139,7 @@ def generate_enrichment_span(self) -> Optional[Dict[str, Union[str, int]]]: def start(self, event=None, context=None): # type: ignore[no-untyped-def] to_send = self._generate_start_span() if not Configuration.send_only_if_error: - report_duration = lumigo_utils.report_json( + report_duration = lambda_reporter.report_json( region=self.region, msgs=[to_send], is_start_span=True ) self.function_span["reporter_rtt"] = report_duration @@ -159,7 +158,7 @@ def handle_timeout(self, *args): # type: ignore[no-untyped-def] self.span_ids_to_send.clear() if Configuration.send_only_if_error: to_send.append(self._generate_start_span()) - lumigo_utils.report_json(region=self.region, msgs=to_send) + lambda_reporter.report_json(region=self.region, msgs=to_send) def start_timeout_timer(self, context=None) -> None: # type: ignore[no-untyped-def] if Configuration.timeout_timer: @@ -306,11 +305,11 @@ def end(self, ret_val=None, event: Optional[dict] = None, context=None) -> Optio "Could not serialize the return value of the lambda", exc_info=True ) self.function_span.update({"return_value": parsed_ret_val}) - if _is_span_has_error(self.function_span): + if is_span_has_error(self.function_span): self._set_error_extra_data(event) spans_contain_errors: bool = any( - _is_span_has_error(s) for s in self.spans.values() - ) or _is_span_has_error(self.function_span) + is_span_has_error(s) for s in self.spans.values() + ) or is_span_has_error(self.function_span) if (not Configuration.send_only_if_error) or spans_contain_errors: to_send = [self.function_span] + [ @@ -319,13 +318,13 @@ def end(self, ret_val=None, event: Optional[dict] = None, context=None) -> Optio enrichment_span = self.generate_enrichment_span() if enrichment_span: to_send.append(enrichment_span) - reported_rtt = lumigo_utils.report_json(region=self.region, msgs=to_send) + reported_rtt = lambda_reporter.report_json(region=self.region, msgs=to_send) else: get_logger().debug( "No Spans were sent, `Configuration.send_only_if_error` is on and no span has error" ) if should_use_tracer_extension(): - write_extension_file([{}], "stop") + lambda_reporter.write_extension_file([{}], "stop") return reported_rtt def _set_error_extra_data(self, event): # type: ignore[no-untyped-def] diff --git a/src/lumigo_tracer/tracer.py b/src/lumigo_tracer/lambda_tracer/tracer.py similarity index 97% rename from src/lumigo_tracer/tracer.py rename to src/lumigo_tracer/lambda_tracer/tracer.py index adc2cb60..3678e756 100644 --- a/src/lumigo_tracer/tracer.py +++ b/src/lumigo_tracer/lambda_tracer/tracer.py @@ -9,8 +9,7 @@ is_aws_environment, is_kill_switch_on, ) -from lumigo_tracer.spans_container import SpansContainer, TimeoutMechanism -from lumigo_tracer.wrappers import wrap +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer, TimeoutMechanism CONTEXT_WRAPPED_BY_LUMIGO_KEY = "_wrapped_by_lumigo" @@ -37,7 +36,6 @@ def _add_wrap_flag_to_context(*args): # type: ignore[no-untyped-def] def _lumigo_tracer(func): # type: ignore[no-untyped-def] if is_kill_switch_on(): return func - wrap() @wraps(func) def lambda_wrapper(*args, **kwargs): # type: ignore[no-untyped-def] diff --git a/src/lumigo_tracer/lumigo_utils.py b/src/lumigo_tracer/lumigo_utils.py index 9f8760b6..fc974214 100644 --- a/src/lumigo_tracer/lumigo_utils.py +++ b/src/lumigo_tracer/lumigo_utils.py @@ -3,8 +3,6 @@ import uuid import time import json -import random -import socket import base64 import logging import decimal @@ -12,39 +10,20 @@ import inspect import datetime import traceback -import http.client -from pathlib import Path -from base64 import b64encode from collections import OrderedDict from contextlib import contextmanager from functools import reduce, lru_cache from typing import Union, List, Optional, Dict, Any, Tuple, Pattern, TypeVar LUMIGO_DOMAINS_SCRUBBER_KEY = "LUMIGO_DOMAINS_SCRUBBER" - -try: - import botocore - import boto3 -except Exception: - botocore = None - boto3 = None - EXECUTION_TAGS_KEY = "lumigo_execution_tags_no_scrub" MANUAL_TRACES_KEY = "manualTraces" EDGE_SUFFIX = "golumigo.com" EDGE_HOST = "{region}.lumigo-tracer-edge." + EDGE_SUFFIX -EDGE_PATH = "/api/spans" -HTTPS_PREFIX = "https://" LOG_FORMAT = "#LUMIGO# - %(levelname)s - %(asctime)s - %(message)s" -SECONDS_TO_TIMEOUT = 0.5 -COOLDOWN_AFTER_TIMEOUT_DURATION = datetime.timedelta(seconds=10) LUMIGO_EVENT_KEY = "_lumigo" STEP_FUNCTION_UID_KEY = "step_function_uid" # number of spans that are too big to enter the reported message before break -TOO_BIG_SPANS_THRESHOLD = 5 -MAX_SIZE_FOR_REQUEST: int = int(os.environ.get("LUMIGO_MAX_SIZE_FOR_REQUEST", 1024 * 500)) -MAX_NUMBER_OF_SPANS: int = int(os.environ.get("LUMIGO_MAX_NUMBER_OF_SPANS", 2000)) -EDGE_TIMEOUT = float(os.environ.get("LUMIGO_EDGE_TIMEOUT", SECONDS_TO_TIMEOUT)) MAX_VARS_SIZE = 100_000 MAX_VAR_LEN = 1024 DEFAULT_MAX_ENTRY_SIZE = 2048 @@ -73,16 +52,13 @@ WARN_CLIENT_PREFIX = "Lumigo Warning" INTERNAL_ANALYTICS_PREFIX = "Lumigo Analytic Log" TRUNCATE_SUFFIX = "...[too long]" -NUMBER_OF_SPANS_IN_REPORT_OPTIMIZATION = 200 DEFAULT_KEY_DEPTH = 4 LUMIGO_TOKEN_KEY = "LUMIGO_TRACER_TOKEN" LUMIGO_USE_TRACER_EXTENSION = "LUMIGO_USE_TRACER_EXTENSION" -LUMIGO_SPANS_DIR = "/tmp/lumigo-spans" KILL_SWITCH = "LUMIGO_SWITCH_OFF" ERROR_SIZE_LIMIT_MULTIPLIER = 2 -CHINA_REGION = "cn-northwest-1" EDGE_KINESIS_STREAM_NAME = "prod_trc-inges-edge_edge-kinesis-stream" -STACKTRACE_LINE_TO_DROP = "lumigo_tracer/tracer.py" +STACKTRACE_LINE_TO_DROP = "lumigo_tracer/lambda_tracer/tracer.py" Container = TypeVar("Container", dict, list) # type: ignore[type-arg,type-arg] DEFAULT_AUTO_TAG_KEY = "LUMIGO_AUTO_TAG" SKIP_COLLECTING_HTTP_BODY_KEY = "LUMIGO_SKIP_COLLECTING_HTTP_BODY" @@ -93,18 +69,11 @@ _logger: Dict[str, logging.Logger] = {} -edge_kinesis_boto_client = None -edge_connection = None - def should_use_tracer_extension() -> bool: return (os.environ.get(LUMIGO_USE_TRACER_EXTENSION) or "false").lower() == "true" -def get_extension_dir() -> str: - return (os.environ.get("LUMIGO_EXTENSION_SPANS_DIR_KEY") or LUMIGO_SPANS_DIR).lower() - - def get_region() -> str: return os.environ.get("AWS_REGION") or "UNKNOWN" @@ -122,13 +91,6 @@ def reset(): # type: ignore[no-untyped-def] def mark_timeout_to_edge(): # type: ignore[no-untyped-def] InternalState.timeout_on_connection = datetime.datetime.now() - @staticmethod - def should_report_to_edge() -> bool: - if not InternalState.timeout_on_connection: - return True - time_diff = datetime.datetime.now() - InternalState.timeout_on_connection - return time_diff > COOLDOWN_AFTER_TIMEOUT_DURATION - class Configuration: should_report: bool = True @@ -280,7 +242,7 @@ def config( ) -def _is_span_has_error(span: dict) -> bool: # type: ignore[type-arg] +def is_span_has_error(span: dict) -> bool: # type: ignore[type-arg] return ( span.get("error") is not None # noqa or span.get("info", {}).get("httpInfo", {}).get("response", {}).get("statusCode", 0) # noqa @@ -289,218 +251,6 @@ def _is_span_has_error(span: dict) -> bool: # type: ignore[type-arg] ) -def _get_event_base64_size(event) -> int: # type: ignore[no-untyped-def] - return len(b64encode(aws_dump(event).encode())) - - -def _create_request_body( - msgs: List[dict], # type: ignore[type-arg] - prune_size_flag: bool, - max_size: int = MAX_SIZE_FOR_REQUEST, - too_big_spans_threshold: int = TOO_BIG_SPANS_THRESHOLD, -) -> str: - - if not prune_size_flag or ( - len(msgs) < NUMBER_OF_SPANS_IN_REPORT_OPTIMIZATION - and _get_event_base64_size(msgs) < max_size # noqa - ): - return aws_dump(msgs)[:max_size] - - end_span = msgs[-1] - ordered_spans = sorted(msgs[:-1], key=_is_span_has_error, reverse=True) - - spans_to_send: list = [end_span] # type: ignore[type-arg] - current_size = _get_event_base64_size(end_span) - too_big_spans = 0 - for span in ordered_spans: - span_size = _get_event_base64_size(span) - if current_size + span_size < max_size: - spans_to_send.append(span) - current_size += span_size - else: - # This is an optimization step. If the spans are too big, don't try to send them. - too_big_spans += 1 - if too_big_spans == too_big_spans_threshold: - break - return aws_dump(spans_to_send)[:max_size] - - -def establish_connection(host=None): # type: ignore[no-untyped-def] - try: - if not host: - host = get_edge_host(os.environ.get("AWS_REGION")) - return http.client.HTTPSConnection(host, timeout=EDGE_TIMEOUT) - except Exception as e: - get_logger().exception(f"Could not establish connection to {host}", exc_info=e) - return None - - -@lru_cache(maxsize=1) -def get_edge_host(region: Optional[str] = None) -> str: - host = Configuration.host or EDGE_HOST.format(region=region or get_region()) - if host.startswith(HTTPS_PREFIX): - host = host[len(HTTPS_PREFIX) :] # noqa: E203 - if host.endswith(EDGE_PATH): - host = host[: -len(EDGE_PATH)] - return host - - -def report_json( # type: ignore[no-untyped-def] - region: Optional[str], msgs: List[dict], should_retry: bool = True, is_start_span=False # type: ignore[type-arg] -) -> int: - """ - This function sends the information back to the edge. - - :param region: The region to use as default if not configured otherwise. - :param msgs: the message to send. - :param should_retry: False to disable the default retry on unsuccessful sending - :param is_start_span: a flag to indicate if this is the start_span - of spans that will be written - :return: The duration of reporting (in milliseconds), - or 0 if we didn't send (due to configuration or fail). - """ - if not InternalState.should_report_to_edge(): - get_logger().info("Skip sending messages due to previous timeout") - return 0 - if not Configuration.should_report: - return 0 - get_logger().info(f"reporting the messages: {msgs[:10]}") - try: - prune_trace: bool = not os.environ.get("LUMIGO_PRUNE_TRACE_OFF", "").lower() == "true" - to_send = _create_request_body(msgs, prune_trace).encode() - except Exception as e: - get_logger().exception("Failed to create request: A span was lost.", exc_info=e) - return 0 - if should_use_tracer_extension(): - with lumigo_safe_execute("report json file: writing spans to file"): - write_spans_to_files(spans=msgs, is_start_span=is_start_span) - return 0 - if region == CHINA_REGION: - return _publish_spans_to_kinesis(to_send, CHINA_REGION) - host = None - global edge_connection - with lumigo_safe_execute("report json: establish connection"): - host = get_edge_host(region) - duration = 0 - if not edge_connection or edge_connection.host != host: - edge_connection = establish_connection(host) - if not edge_connection: - get_logger().warning("Can not establish connection. Skip sending span.") - return duration - try: - start_time = time.time() - edge_connection.request( - "POST", - EDGE_PATH, - to_send, - headers={"Content-Type": "application/json", "Authorization": Configuration.token}, - ) - response = edge_connection.getresponse() - response.read() # We must read the response to keep the connection available - duration = int((time.time() - start_time) * 1000) - get_logger().info(f"successful reporting, code: {getattr(response, 'code', 'unknown')}") - except socket.timeout: - get_logger().exception(f"Timeout while connecting to {host}") - InternalState.mark_timeout_to_edge() - internal_analytics_message("report: socket.timeout") - except Exception as e: - if should_retry: - get_logger().info(f"Could not report to {host}: ({str(e)}). Retrying.") - edge_connection = establish_connection(host) - report_json(region, msgs, should_retry=False) - else: - get_logger().exception("Could not report: A span was lost.", exc_info=e) - internal_analytics_message(f"report: {type(e)}") - return duration - - -def get_span_file_name(span_type: str): # type: ignore[no-untyped-def] - unique_name = str(uuid.uuid4()) - return os.path.join(get_extension_dir(), f"{unique_name}_{span_type}") - - -def write_extension_file(data: List[Dict], span_type: str): # type: ignore[no-untyped-def,type-arg] - Path(get_extension_dir()).mkdir(parents=True, exist_ok=True) - to_send = aws_dump(data).encode() - file_path = get_span_file_name(span_type) - with open(file_path, "wb") as span_file: - span_file.write(to_send) - get_logger().info(f"Wrote span to file to [{file_path}][{len(to_send)}]") - - -def write_spans_to_files( # type: ignore[no-untyped-def] - spans: List[Dict], max_spans=MAX_NUMBER_OF_SPANS, is_start_span=True # type: ignore[type-arg] -) -> None: - to_send = spans[:max_spans] - if is_start_span: - get_logger().info("Creating start span file") - write_extension_file(to_send, "span") - else: - get_logger().info("Creating end span file") - write_extension_file(to_send, "end") - - -def _publish_spans_to_kinesis(to_send: bytes, region: str) -> int: - start_time = time.time() - try: - get_logger().info("Sending spans to Kinesis") - if not Configuration.edge_kinesis_aws_access_key_id: - get_logger().error("Missing edge_kinesis_aws_access_key_id, can't publish the spans") - return 0 - if not Configuration.edge_kinesis_aws_secret_access_key: - get_logger().error( - "Missing edge_kinesis_aws_secret_access_key, can't publish the spans" - ) - return 0 - _send_data_to_kinesis( - stream_name=Configuration.edge_kinesis_stream_name, - to_send=to_send, - region=region, - aws_access_key_id=Configuration.edge_kinesis_aws_access_key_id, - aws_secret_access_key=Configuration.edge_kinesis_aws_secret_access_key, - ) - except Exception as err: - get_logger().exception("Failed to send spans to Kinesis", exc_info=err) - warn_client(f"Failed to send spans to Kinesis: {err}") - return int((time.time() - start_time) * 1000) - - -def _is_edge_kinesis_connection_cache_disabled() -> bool: - return os.environ.get("LUMIGO_KINESIS_SHOULD_REUSE_CONNECTION", "").lower() == "false" - - -def _get_edge_kinesis_boto_client(region: str, aws_access_key_id: str, aws_secret_access_key: str): # type: ignore[no-untyped-def] - global edge_kinesis_boto_client - if not edge_kinesis_boto_client or _is_edge_kinesis_connection_cache_disabled(): - edge_kinesis_boto_client = boto3.client( - "kinesis", - region_name=region, - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - config=botocore.config.Config(retries={"max_attempts": 1, "mode": "standard"}), - ) - return edge_kinesis_boto_client - - -def _send_data_to_kinesis( # type: ignore[no-untyped-def] - stream_name: str, - to_send: bytes, - region: str, - aws_access_key_id: str, - aws_secret_access_key: str, -): - if not boto3: - get_logger().error("boto3 is missing. Unable to send to Kinesis.") - return None - client = _get_edge_kinesis_boto_client( - region=region, - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - ) - client.put_record(Data=to_send, StreamName=stream_name, PartitionKey=str(random.random())) - get_logger().info("Successful sending to Kinesis") - - def get_logger(logger_name="lumigo"): # type: ignore[no-untyped-def] """ This function returns lumigo's logger. @@ -533,7 +283,7 @@ def lumigo_safe_execute(part_name="", severity=logging.ERROR): # type: ignore[n ) -def is_aws_environment(): # type: ignore[no-untyped-def] +def is_aws_environment() -> bool: """ :return: heuristically determine rather we're running on an aws environment. """ @@ -860,19 +610,5 @@ def is_python_37() -> bool: return os.environ.get("AWS_EXECUTION_ENV") == "AWS_Lambda_python3.7" -try: - # Try to establish the connection in initialization - if ( - os.environ.get("LUMIGO_INITIALIZATION_CONNECTION", "").lower() != "false" - and get_region() != CHINA_REGION # noqa - ): - edge_connection = establish_connection() - edge_connection.connect() -except socket.timeout: - InternalState.mark_timeout_to_edge() -except Exception: - pass - - def is_lambda_traced() -> bool: return (not is_kill_switch_on()) and is_aws_environment() diff --git a/src/lumigo_tracer/user_utils.py b/src/lumigo_tracer/user_utils.py index 0517ec2c..40f4ad0c 100644 --- a/src/lumigo_tracer/user_utils.py +++ b/src/lumigo_tracer/user_utils.py @@ -3,7 +3,7 @@ from contextlib import contextmanager from typing import Dict, Optional -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.lumigo_utils import warn_client, is_lambda_traced LUMIGO_REPORT_ERROR_STRING = "[LUMIGO_LOG]" diff --git a/src/lumigo_tracer/wrappers/__init__.py b/src/lumigo_tracer/wrappers/__init__.py index 8be6d47b..d268aa42 100644 --- a/src/lumigo_tracer/wrappers/__init__.py +++ b/src/lumigo_tracer/wrappers/__init__.py @@ -3,12 +3,14 @@ from .redis.redis_wrapper import wrap_redis from .sql.sqlalchemy_wrapper import wrap_sqlalchemy from .aiohttp.aiohttp_wrapper import wrap_aiohttp - +from ..lumigo_utils import is_aws_environment already_wrapped = False -def wrap(force: bool = False): # type: ignore[no-untyped-def] +def wrap(force: bool = False) -> None: + if not is_aws_environment(): + return global already_wrapped if not already_wrapped: # Never wrap http calls twice - it will create duplicate body diff --git a/src/lumigo_tracer/wrappers/aiohttp/aiohttp_wrapper.py b/src/lumigo_tracer/wrappers/aiohttp/aiohttp_wrapper.py index e8ad5bcd..4faf0c1b 100644 --- a/src/lumigo_tracer/wrappers/aiohttp/aiohttp_wrapper.py +++ b/src/lumigo_tracer/wrappers/aiohttp/aiohttp_wrapper.py @@ -1,6 +1,6 @@ from lumigo_tracer.lumigo_utils import lumigo_safe_execute, get_logger, concat_old_body_to_new from lumigo_tracer.libs.wrapt import wrap_function_wrapper -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.wrappers.http.http_data_classes import HttpRequest from lumigo_tracer.wrappers.http.sync_http_wrappers import add_request_event, update_event_response diff --git a/src/lumigo_tracer/wrappers/http/sync_http_wrappers.py b/src/lumigo_tracer/wrappers/http/sync_http_wrappers.py index 185090b5..21ec178a 100644 --- a/src/lumigo_tracer/wrappers/http/sync_http_wrappers.py +++ b/src/lumigo_tracer/wrappers/http/sync_http_wrappers.py @@ -7,11 +7,12 @@ import logging import random +from lumigo_tracer.lambda_tracer.lambda_reporter import get_edge_host from lumigo_tracer.wrappers.http.http_data_classes import HttpRequest, HttpState from lumigo_tracer.parsing_utils import safe_get_list, recursive_json_join from lumigo_tracer.wrappers.http.http_parser import get_parser, HTTP_TYPE from lumigo_tracer.libs.wrapt import wrap_function_wrapper -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.lumigo_utils import ( get_logger, lumigo_safe_execute, @@ -20,7 +21,6 @@ lumigo_dumps, get_size_upper_bound, is_error_code, - get_edge_host, TRUNCATE_SUFFIX, concat_old_body_to_new, EDGE_SUFFIX, diff --git a/src/lumigo_tracer/wrappers/pymongo/pymongo_wrapper.py b/src/lumigo_tracer/wrappers/pymongo/pymongo_wrapper.py index 557b2ab4..eb516a81 100644 --- a/src/lumigo_tracer/wrappers/pymongo/pymongo_wrapper.py +++ b/src/lumigo_tracer/wrappers/pymongo/pymongo_wrapper.py @@ -7,7 +7,7 @@ lumigo_dumps, get_current_ms_time, ) -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer try: from pymongo import monitoring diff --git a/src/lumigo_tracer/wrappers/redis/redis_wrapper.py b/src/lumigo_tracer/wrappers/redis/redis_wrapper.py index 661bdddf..7251a204 100644 --- a/src/lumigo_tracer/wrappers/redis/redis_wrapper.py +++ b/src/lumigo_tracer/wrappers/redis/redis_wrapper.py @@ -10,7 +10,7 @@ lumigo_dumps, get_current_ms_time, ) -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer REDIS_SPAN = "redis" diff --git a/src/lumigo_tracer/wrappers/sql/sqlalchemy_wrapper.py b/src/lumigo_tracer/wrappers/sql/sqlalchemy_wrapper.py index e6df8dea..de65ff9f 100644 --- a/src/lumigo_tracer/wrappers/sql/sqlalchemy_wrapper.py +++ b/src/lumigo_tracer/wrappers/sql/sqlalchemy_wrapper.py @@ -10,7 +10,7 @@ lumigo_dumps, get_current_ms_time, ) -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer try: from sqlalchemy.event import listen diff --git a/src/test/component/test_component.py b/src/test/component/test_component.py index b7762ebc..ba6557d0 100644 --- a/src/test/component/test_component.py +++ b/src/test/component/test_component.py @@ -5,9 +5,9 @@ import subprocess import http.client -from lumigo_tracer.tracer import lumigo_tracer +from lumigo_tracer.lambda_tracer.tracer import lumigo_tracer from lumigo_tracer.lumigo_utils import md5hash -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer TOKEN = "t_10faa5e13e7844aaa1234" @@ -22,7 +22,7 @@ def serverless_yaml(): @pytest.fixture(autouse=True) -def aws_env_variables(monkeypatch): +def aws_env_variables(monkeypatch, aws_environment): """ When running in AWS Lambda, there are some environment variables that AWS creates and the tracer uses. This fixture creates those environment variables. diff --git a/src/test/conftest.py b/src/test/conftest.py index a2e8058e..a23631f2 100644 --- a/src/test/conftest.py +++ b/src/test/conftest.py @@ -8,14 +8,15 @@ import pytest from lumigo_tracer import lumigo_utils +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer.lambda_tracer.lambda_reporter import get_edge_host from lumigo_tracer.lumigo_utils import ( Configuration, get_omitting_regex, get_logger, - get_edge_host, InternalState, ) -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.wrappers.http.http_data_classes import HttpState USE_TRACER_EXTENSION = "LUMIGO_USE_TRACER_EXTENSION" @@ -26,9 +27,9 @@ def reporter_mock(monkeypatch, request): if request.node.get_closest_marker("dont_mock_lumigo_utils_reporter"): return lumigo_utils.Configuration.should_report = False - reporter_mock = mock.Mock(lumigo_utils.report_json) + reporter_mock = mock.Mock(lambda_reporter.report_json) reporter_mock.return_value = 123 - monkeypatch.setattr(lumigo_utils, "report_json", reporter_mock) + monkeypatch.setattr(lambda_reporter, "report_json", reporter_mock) return reporter_mock @@ -46,7 +47,7 @@ def with_extension(monkeypatch): def remove_caches(monkeypatch): get_omitting_regex.cache_clear() get_edge_host.cache_clear() - monkeypatch.setattr(lumigo_utils, "edge_kinesis_boto_client", None) + monkeypatch.setattr(lambda_reporter, "edge_kinesis_boto_client", None) @pytest.yield_fixture(autouse=True) @@ -103,6 +104,11 @@ def context(): return SimpleNamespace(aws_request_id="1234", get_remaining_time_in_millis=lambda: 1000 * 2) +@pytest.fixture +def aws_environment(monkeypatch): + monkeypatch.setenv("AWS_LAMBDA_FUNCTION_VERSION", "true") + + @pytest.fixture(autouse=True) def extension_clean(): yield diff --git a/src/test/unit/auto_tag/test_auto_tag_event.py b/src/test/unit/auto_tag/test_auto_tag_event.py index 0c39eebd..a20be787 100644 --- a/src/test/unit/auto_tag/test_auto_tag_event.py +++ b/src/test/unit/auto_tag/test_auto_tag_event.py @@ -8,7 +8,7 @@ AutoTagEvent, ConfigurationHandler, ) -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.lumigo_utils import Configuration diff --git a/src/test/unit/lambda_tracer/test_global_scope_exec.py b/src/test/unit/lambda_tracer/test_global_scope_exec.py new file mode 100644 index 00000000..11897bb6 --- /dev/null +++ b/src/test/unit/lambda_tracer/test_global_scope_exec.py @@ -0,0 +1,18 @@ +from lumigo_tracer.lambda_tracer.global_scope_exec import global_scope_exec +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer import wrappers + + +def test_global_scope_preparation_called_only_in_lambda(monkeypatch): + lambda_reporter.edge_connection = None + wrappers.already_wrapped = False + + monkeypatch.delenv("AWS_LAMBDA_FUNCTION_VERSION", raising=False) + global_scope_exec() + assert lambda_reporter.edge_connection is None + assert not wrappers.already_wrapped + + monkeypatch.setenv("AWS_LAMBDA_FUNCTION_VERSION", "true") + global_scope_exec() + assert lambda_reporter.edge_connection is not None + assert wrappers.already_wrapped diff --git a/src/test/unit/lambda_tracer/test_lambda_reporter.py b/src/test/unit/lambda_tracer/test_lambda_reporter.py new file mode 100644 index 00000000..192225dc --- /dev/null +++ b/src/test/unit/lambda_tracer/test_lambda_reporter.py @@ -0,0 +1,234 @@ +import os +import uuid +import json +import socket +import logging +import datetime +import http.client +import importlib.util +from unittest.mock import Mock + +import boto3 +from mock import MagicMock +import pytest + +from lumigo_tracer import lumigo_utils +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer.lambda_tracer.lambda_reporter import ( + _create_request_body, + _get_event_base64_size, + EDGE_PATH, + get_edge_host, + report_json, + CHINA_REGION, + get_extension_dir, + establish_connection, +) +from lumigo_tracer.lumigo_utils import ( + Configuration, + InternalState, +) + + +@pytest.fixture +def dummy_span(): + return {"dummy": "dummy"} + + +@pytest.fixture +def function_end_span(): + return {"dummy_end": "dummy_end"} + + +@pytest.fixture +def error_span(): + return {"dummy": "dummy", "error": "Error"} + + +def test_create_request_body_default(dummy_span): + assert _create_request_body([dummy_span], False) == json.dumps([dummy_span]) + + +def test_create_request_body_not_effecting_small_events(dummy_span): + assert _create_request_body([dummy_span], True, 1_000_000) == json.dumps([dummy_span]) + + +def test_create_request_body_keep_function_span_and_filter_other_spans( + dummy_span, function_end_span +): + expected_result = [dummy_span, dummy_span, dummy_span, function_end_span] + size = _get_event_base64_size(expected_result) + assert _create_request_body(expected_result * 2, True, size) == json.dumps( + [function_end_span, dummy_span, dummy_span, dummy_span] + ) + + +def test_create_request_body_take_error_first(dummy_span, error_span, function_end_span): + expected_result = [function_end_span, error_span, dummy_span, dummy_span] + input = [ + dummy_span, + dummy_span, + dummy_span, + dummy_span, + dummy_span, + error_span, + function_end_span, + ] + size = _get_event_base64_size(expected_result) + assert _create_request_body(input, True, size) == json.dumps(expected_result) + + +@pytest.mark.parametrize( + ["arg", "host"], + [("https://a.com", "a.com"), (f"https://b.com{EDGE_PATH}", "b.com"), ("h.com", "h.com")], +) +def test_get_edge_host(arg, host, monkeypatch): + monkeypatch.setattr(Configuration, "host", arg) + assert get_edge_host("region") == host + + +def test_report_json_extension_spans_mode(monkeypatch, reporter_mock, tmpdir): + extension_dir = tmpdir.mkdir("tmp") + monkeypatch.setattr(uuid, "uuid4", lambda *args, **kwargs: "span_name") + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setenv("LUMIGO_USE_TRACER_EXTENSION", "TRUE") + monkeypatch.setenv("LUMIGO_EXTENSION_SPANS_DIR_KEY", extension_dir) + mocked_urandom = MagicMock(hex=MagicMock(return_value="my_mocked_data")) + monkeypatch.setattr(os, "urandom", lambda *args, **kwargs: mocked_urandom) + + start_span = [{"span": "true"}] + report_json(region=None, msgs=start_span, is_start_span=True) + + spans = [] + size_factor = 100 + for i in range(size_factor): + spans.append( + { + i: "a" * size_factor, + } + ) + report_json(region=None, msgs=spans, is_start_span=False) + start_path_path = f"{get_extension_dir()}/span_name_span" + end_path_path = f"{get_extension_dir()}/span_name_end" + start_file_content = json.loads(open(start_path_path, "r").read()) + end_file_content = json.loads(open(end_path_path, "r").read()) + assert start_span == start_file_content + assert json.dumps(end_file_content) == json.dumps(spans) + + +@pytest.mark.parametrize( + "errors, final_log", [(ValueError, "ERROR"), ([ValueError, Mock()], "INFO")] +) +def test_report_json_retry(monkeypatch, reporter_mock, caplog, errors, final_log): + reporter_mock.side_effect = report_json + monkeypatch.setattr(Configuration, "host", "force_reconnect") + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(http.client, "HTTPSConnection", Mock()) + http.client.HTTPSConnection("force_reconnect").getresponse.side_effect = errors + + report_json(None, [{"a": "b"}]) + + assert caplog.records[-1].levelname == final_log + + +def test_report_json_fast_failure_after_timeout(monkeypatch, reporter_mock, caplog): + reporter_mock.side_effect = report_json + monkeypatch.setattr(Configuration, "host", "host") + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(http.client, "HTTPSConnection", Mock()) + http.client.HTTPSConnection("force_reconnect").getresponse.side_effect = socket.timeout + + assert report_json(None, [{"a": "b"}]) == 0 + assert caplog.records[-1].msg == "Timeout while connecting to host" + + assert report_json(None, [{"a": "b"}]) == 0 + assert caplog.records[-1].msg == "Skip sending messages due to previous timeout" + + InternalState.timeout_on_connection = datetime.datetime(2016, 1, 1) + assert report_json(None, [{"a": "b"}]) == 0 + assert caplog.records[-1].msg == "Timeout while connecting to host" + + +def test_report_json_china_missing_access_key_id(monkeypatch, reporter_mock, caplog): + monkeypatch.setattr(Configuration, "should_report", True) + reporter_mock.side_effect = report_json + assert report_json(CHINA_REGION, [{"a": "b"}]) == 0 + assert any( + "edge_kinesis_aws_access_key_id" in record.message and record.levelname == "ERROR" + for record in caplog.records + ) + + +def test_report_json_china_missing_secret_access_key(monkeypatch, reporter_mock, caplog): + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") + reporter_mock.side_effect = report_json + assert report_json(CHINA_REGION, [{"a": "b"}]) == 0 + assert any( + "edge_kinesis_aws_secret_access_key" in record.message and record.levelname == "ERROR" + for record in caplog.records + ) + + +def test_report_json_china_no_boto(monkeypatch, reporter_mock, caplog): + reporter_mock.side_effect = report_json + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") + monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") + monkeypatch.setattr(lambda_reporter, "boto3", None) + + report_json(CHINA_REGION, [{"a": "b"}]) + + assert any( + "boto3 is missing. Unable to send to Kinesis" in record.message + and record.levelname == "ERROR" # noqa + for record in caplog.records + ) + + +def test_report_json_china_on_error_no_exception_and_notify_user(capsys, monkeypatch): + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") + monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") + monkeypatch.setattr(boto3, "client", MagicMock(side_effect=Exception)) + lumigo_utils.get_logger().setLevel(logging.CRITICAL) + + report_json(CHINA_REGION, [{"a": "b"}]) + + assert "Failed to send spans" in capsys.readouterr().out + + +def test_china_shouldnt_establish_http_connection(monkeypatch): + monkeypatch.setenv("AWS_REGION", CHINA_REGION) + # Reload a duplicate of lambda_reporter + spec = importlib.util.find_spec("lumigo_tracer.lambda_tracer.lambda_reporter") + lumigo_utils_reloaded = importlib.util.module_from_spec(spec) + spec.loader.exec_module(lumigo_utils_reloaded) + establish_connection() + + assert lumigo_utils_reloaded.edge_connection is None + + +def test_china_with_env_variable_shouldnt_reuse_boto3_connection(monkeypatch): + monkeypatch.setenv("LUMIGO_KINESIS_SHOULD_REUSE_CONNECTION", "false") + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") + monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") + monkeypatch.setattr(boto3, "client", MagicMock()) + + report_json(CHINA_REGION, [{"a": "b"}]) + report_json(CHINA_REGION, [{"a": "b"}]) + + assert boto3.client.call_count == 2 + + +def test_china_reuse_boto3_connection(monkeypatch): + monkeypatch.setattr(Configuration, "should_report", True) + monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") + monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") + monkeypatch.setattr(boto3, "client", MagicMock()) + + report_json(CHINA_REGION, [{"a": "b"}]) + report_json(CHINA_REGION, [{"a": "b"}]) + + boto3.client.assert_called_once() diff --git a/src/test/unit/test_spans_container.py b/src/test/unit/lambda_tracer/test_spans_container.py similarity index 96% rename from src/test/unit/test_spans_container.py rename to src/test/unit/lambda_tracer/test_spans_container.py index 54bb31cc..49b2e7b4 100644 --- a/src/test/unit/test_spans_container.py +++ b/src/test/unit/lambda_tracer/test_spans_container.py @@ -9,9 +9,11 @@ import pytest -from lumigo_tracer import lumigo_utils, add_execution_tag +from lumigo_tracer import add_execution_tag +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer.lambda_tracer.lambda_reporter import get_extension_dir from lumigo_tracer.wrappers.http.http_parser import HTTP_TYPE -from lumigo_tracer.spans_container import ( +from lumigo_tracer.lambda_tracer.spans_container import ( SpansContainer, TimeoutMechanism, FUNCTION_TYPE, @@ -53,7 +55,7 @@ def test_spans_container_not_send_start_span_on_send_only_on_errors_mode(monkeyp def test_start(monkeypatch): lumigo_utils_mock = mock.Mock() monkeypatch.setenv("LUMIGO_USE_TRACER_EXTENSION", "true") - monkeypatch.setattr(lumigo_utils, "write_extension_file", lumigo_utils_mock) + monkeypatch.setattr(lambda_reporter, "write_extension_file", lumigo_utils_mock) monkeypatch.setattr(SpansContainer, "_generate_start_span", lambda *args, **kwargs: {"a": "a"}) monkeypatch.setattr(Configuration, "should_report", True) SpansContainer().start() @@ -95,7 +97,7 @@ def only_if_error(dummy_span, monkeypatch, tmpdir): SpansContainer.get_span().add_span(dummy_span) reported_ttl = SpansContainer.get_span().end({}) - stop_path_path = f"{lumigo_utils.get_extension_dir()}/span_name_stop" + stop_path_path = f"{get_extension_dir()}/span_name_stop" return reported_ttl, stop_path_path diff --git a/src/test/unit/test_tracer.py b/src/test/unit/lambda_tracer/test_tracer.py similarity index 96% rename from src/test/unit/test_tracer.py rename to src/test/unit/lambda_tracer/test_tracer.py index 0040e780..611d5e1f 100644 --- a/src/test/unit/test_tracer.py +++ b/src/test/unit/lambda_tracer/test_tracer.py @@ -15,20 +15,19 @@ from capturer import CaptureOutput from lumigo_tracer import lumigo_tracer, LumigoChalice, add_execution_tag -from lumigo_tracer import lumigo_utils +from lumigo_tracer.lambda_tracer import lambda_reporter +from lumigo_tracer.lambda_tracer.lambda_reporter import _create_request_body, report_json from lumigo_tracer.lumigo_utils import ( Configuration, STEP_FUNCTION_UID_KEY, LUMIGO_EVENT_KEY, - _create_request_body, EXECUTION_TAGS_KEY, - report_json, EDGE_KINESIS_STREAM_NAME, SKIP_COLLECTING_HTTP_BODY_KEY, LUMIGO_PROPAGATE_W3C, ) -from lumigo_tracer.spans_container import SpansContainer, ENRICHMENT_TYPE +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer, ENRICHMENT_TYPE from moto import mock_kinesis from lumigo_tracer.w3c_context import TRACEPARENT_HEADER_NAME @@ -57,7 +56,7 @@ def lambda_test_function(event, context): assert first_send[0]["maxFinishTime"] -@pytest.mark.parametrize("token", ["t_", "", "10faa5e13e7844aaa1234"]) +@pytest.mark.parametrize("token", ["t_", "", "123456789101112"]) def test_lambda_wrapper_validate_token_format_not_valid(context, capsys, token): """ This test checks that the token has a valid format (sends warning since all inputs are invalid) @@ -394,7 +393,7 @@ def lambda_test_function(event, context): d = {"a": "b", "myPassword": "123"} conn = http.client.HTTPConnection("www.google.com") conn.request("POST", "/", json.dumps(d)) - return {"secret_password": "lumigo rulz"} + return {"secret_password": "lumigo rulz"} # pragma: allowlist secret lambda_test_function({"key": "24"}, context) span = SpansContainer.get_span() @@ -495,11 +494,11 @@ def lambda_test_function(event, context): @mock_kinesis def test_china(context, reporter_mock, monkeypatch): china_region_for_test = "ap-east-1" # Moto doesn't work for China - monkeypatch.setattr(lumigo_utils, "CHINA_REGION", china_region_for_test) + monkeypatch.setattr(lambda_reporter, "CHINA_REGION", china_region_for_test) monkeypatch.setenv("AWS_REGION", china_region_for_test) reporter_mock.side_effect = report_json # Override the conftest's monkeypatch access_key_id = "my_access_key_id" - secret_access_key = "my_secret_access_key" + secret_access_key = "my_secret_access_key" # pragma: allowlist secret # Create edge Kinesis client = boto3.client( "kinesis", @@ -563,7 +562,7 @@ def wrapped(event, context): wrapped({}, context) stacktrace = SpansContainer.get_span().function_span["error"]["stacktrace"] - assert "lumigo_tracer/tracer.py" not in stacktrace + assert "lumigo_tracer/lambda_tracer/tracer.py" not in stacktrace line_dropper = re.compile(r"\d{3}") from_lumigo = line_dropper.sub("-", stacktrace) original = line_dropper.sub("-", traceback.format_tb(e.value.__traceback__)[1]) diff --git a/src/test/unit/test_lumigo_utils.py b/src/test/unit/test_lumigo_utils.py index 46b12b64..f096a623 100644 --- a/src/test/unit/test_lumigo_utils.py +++ b/src/test/unit/test_lumigo_utils.py @@ -1,24 +1,13 @@ -import importlib.util import inspect import logging -import os -import uuid from collections import OrderedDict from decimal import Decimal import datetime -import http.client -import socket -from unittest.mock import Mock - -import boto3 -from mock import MagicMock import pytest + from lumigo_tracer import lumigo_utils from lumigo_tracer.lumigo_utils import ( - _create_request_body, - _is_span_has_error, - _get_event_base64_size, MAX_VARS_SIZE, format_frames, _truncate_locals, @@ -35,40 +24,20 @@ SKIP_SCRUBBING_KEYS, get_timeout_buffer, lumigo_dumps, - get_edge_host, - EDGE_PATH, - report_json, is_kill_switch_on, KILL_SWITCH, is_error_code, get_size_upper_bound, is_aws_arn, - CHINA_REGION, internal_analytics_message, INTERNAL_ANALYTICS_PREFIX, - InternalState, concat_old_body_to_new, TRUNCATE_SUFFIX, DEFAULT_AUTO_TAG_KEY, lumigo_safe_execute, is_python_37, + is_span_has_error, ) -import json - - -@pytest.fixture -def dummy_span(): - return {"dummy": "dummy"} - - -@pytest.fixture -def function_end_span(): - return {"dummy_end": "dummy_end"} - - -@pytest.fixture -def error_span(): - return {"dummy": "dummy", "error": "Error"} @pytest.mark.parametrize( @@ -81,40 +50,7 @@ def error_span(): ], ) def test_is_span_has_error(input_span, expected_is_error): - assert _is_span_has_error(input_span) is expected_is_error - - -def test_create_request_body_default(dummy_span): - assert _create_request_body([dummy_span], False) == json.dumps([dummy_span]) - - -def test_create_request_body_not_effecting_small_events(dummy_span): - assert _create_request_body([dummy_span], True, 1_000_000) == json.dumps([dummy_span]) - - -def test_create_request_body_keep_function_span_and_filter_other_spans( - dummy_span, function_end_span -): - expected_result = [dummy_span, dummy_span, dummy_span, function_end_span] - size = _get_event_base64_size(expected_result) - assert _create_request_body(expected_result * 2, True, size) == json.dumps( - [function_end_span, dummy_span, dummy_span, dummy_span] - ) - - -def test_create_request_body_take_error_first(dummy_span, error_span, function_end_span): - expected_result = [function_end_span, error_span, dummy_span, dummy_span] - input = [ - dummy_span, - dummy_span, - dummy_span, - dummy_span, - dummy_span, - error_span, - function_end_span, - ] - size = _get_event_base64_size(expected_result) - assert _create_request_body(input, True, size) == json.dumps(expected_result) + assert is_span_has_error(input_span) is expected_is_error @pytest.mark.parametrize( @@ -464,161 +400,6 @@ def test_get_timeout_buffer(remaining_time, conf, expected): assert get_timeout_buffer(remaining_time) == expected -@pytest.mark.parametrize( - ["arg", "host"], - [("https://a.com", "a.com"), (f"https://b.com{EDGE_PATH}", "b.com"), ("h.com", "h.com")], -) -def test_get_edge_host(arg, host, monkeypatch): - monkeypatch.setattr(Configuration, "host", arg) - assert get_edge_host("region") == host - - -def test_report_json_extension_spans_mode(monkeypatch, reporter_mock, tmpdir): - extension_dir = tmpdir.mkdir("tmp") - monkeypatch.setattr(uuid, "uuid4", lambda *args, **kwargs: "span_name") - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setenv("LUMIGO_USE_TRACER_EXTENSION", "TRUE") - monkeypatch.setenv("LUMIGO_EXTENSION_SPANS_DIR_KEY", extension_dir) - mocked_urandom = MagicMock(hex=MagicMock(return_value="my_mocked_data")) - monkeypatch.setattr(os, "urandom", lambda *args, **kwargs: mocked_urandom) - - start_span = [{"span": "true"}] - report_json(region=None, msgs=start_span, is_start_span=True) - - spans = [] - size_factor = 100 - for i in range(size_factor): - spans.append( - { - i: "a" * size_factor, - } - ) - report_json(region=None, msgs=spans, is_start_span=False) - start_path_path = f"{lumigo_utils.get_extension_dir()}/span_name_span" - end_path_path = f"{lumigo_utils.get_extension_dir()}/span_name_end" - start_file_content = json.loads(open(start_path_path, "r").read()) - end_file_content = json.loads(open(end_path_path, "r").read()) - assert start_span == start_file_content - assert json.dumps(end_file_content) == json.dumps(spans) - - -@pytest.mark.parametrize( - "errors, final_log", [(ValueError, "ERROR"), ([ValueError, Mock()], "INFO")] -) -def test_report_json_retry(monkeypatch, reporter_mock, caplog, errors, final_log): - reporter_mock.side_effect = report_json - monkeypatch.setattr(Configuration, "host", "force_reconnect") - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(http.client, "HTTPSConnection", Mock()) - http.client.HTTPSConnection("force_reconnect").getresponse.side_effect = errors - - report_json(None, [{"a": "b"}]) - - assert caplog.records[-1].levelname == final_log - - -def test_report_json_fast_failure_after_timeout(monkeypatch, reporter_mock, caplog): - reporter_mock.side_effect = report_json - monkeypatch.setattr(Configuration, "host", "host") - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(http.client, "HTTPSConnection", Mock()) - http.client.HTTPSConnection("force_reconnect").getresponse.side_effect = socket.timeout - - assert report_json(None, [{"a": "b"}]) == 0 - assert caplog.records[-1].msg == "Timeout while connecting to host" - - assert report_json(None, [{"a": "b"}]) == 0 - assert caplog.records[-1].msg == "Skip sending messages due to previous timeout" - - InternalState.timeout_on_connection = datetime.datetime(2016, 1, 1) - assert report_json(None, [{"a": "b"}]) == 0 - assert caplog.records[-1].msg == "Timeout while connecting to host" - - -def test_report_json_china_missing_access_key_id(monkeypatch, reporter_mock, caplog): - monkeypatch.setattr(Configuration, "should_report", True) - reporter_mock.side_effect = report_json - assert report_json(CHINA_REGION, [{"a": "b"}]) == 0 - assert any( - "edge_kinesis_aws_access_key_id" in record.message and record.levelname == "ERROR" - for record in caplog.records - ) - - -def test_report_json_china_missing_secret_access_key(monkeypatch, reporter_mock, caplog): - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") - reporter_mock.side_effect = report_json - assert report_json(CHINA_REGION, [{"a": "b"}]) == 0 - assert any( - "edge_kinesis_aws_secret_access_key" in record.message and record.levelname == "ERROR" - for record in caplog.records - ) - - -def test_report_json_china_no_boto(monkeypatch, reporter_mock, caplog): - reporter_mock.side_effect = report_json - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") - monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") - monkeypatch.setattr(lumigo_utils, "boto3", None) - - report_json(CHINA_REGION, [{"a": "b"}]) - - assert any( - "boto3 is missing. Unable to send to Kinesis" in record.message - and record.levelname == "ERROR" # noqa - for record in caplog.records - ) - - -def test_report_json_china_on_error_no_exception_and_notify_user(capsys, monkeypatch): - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") - monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") - monkeypatch.setattr(boto3, "client", MagicMock(side_effect=Exception)) - lumigo_utils.get_logger().setLevel(logging.CRITICAL) - - report_json(CHINA_REGION, [{"a": "b"}]) - - assert "Failed to send spans" in capsys.readouterr().out - - -def test_china_shouldnt_establish_http_connection(monkeypatch): - monkeypatch.setenv("AWS_REGION", CHINA_REGION) - # Reload a duplicate of lumigo_utils - spec = importlib.util.find_spec("lumigo_tracer.lumigo_utils") - lumigo_utils_reloaded = importlib.util.module_from_spec(spec) - spec.loader.exec_module(lumigo_utils_reloaded) - - assert lumigo_utils_reloaded.edge_connection is None - - -def test_china_with_env_variable_shouldnt_reuse_boto3_connection(monkeypatch): - monkeypatch.setenv("LUMIGO_KINESIS_SHOULD_REUSE_CONNECTION", "false") - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") - monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") - monkeypatch.setattr(boto3, "client", MagicMock()) - - report_json(CHINA_REGION, [{"a": "b"}]) - report_json(CHINA_REGION, [{"a": "b"}]) - - assert boto3.client.call_count == 2 - - -def test_china_reuse_boto3_connection(monkeypatch): - monkeypatch.setattr(Configuration, "should_report", True) - monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value") - monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value") - monkeypatch.setattr(boto3, "client", MagicMock()) - - report_json(CHINA_REGION, [{"a": "b"}]) - report_json(CHINA_REGION, [{"a": "b"}]) - - boto3.client.assert_called_once() - - @pytest.mark.parametrize("env, expected", [("True", True), ("other", False), ("123", False)]) def test_is_kill_switch_on(monkeypatch, env, expected): monkeypatch.setenv(KILL_SWITCH, env) diff --git a/src/test/unit/test_user_utils.py b/src/test/unit/test_user_utils.py index 39a019bc..e4d4cf51 100644 --- a/src/test/unit/test_user_utils.py +++ b/src/test/unit/test_user_utils.py @@ -1,7 +1,7 @@ import time import pytest -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.user_utils import ( warn, info, diff --git a/src/test/unit/wrappers/aiohttp/test_aiohttp_wrapper.py b/src/test/unit/wrappers/aiohttp/test_aiohttp_wrapper.py index 8daffae2..04f0c0ff 100644 --- a/src/test/unit/wrappers/aiohttp/test_aiohttp_wrapper.py +++ b/src/test/unit/wrappers/aiohttp/test_aiohttp_wrapper.py @@ -5,7 +5,7 @@ import pytest import lumigo_tracer -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer def test_aiohttp_happy_flow(context, token): diff --git a/src/test/unit/wrappers/conftest.py b/src/test/unit/wrappers/conftest.py new file mode 100644 index 00000000..32502a53 --- /dev/null +++ b/src/test/unit/wrappers/conftest.py @@ -0,0 +1,8 @@ +import pytest + +from lumigo_tracer.wrappers import wrap + + +@pytest.fixture(autouse=True) +def wrap_everything(aws_environment): + wrap() diff --git a/src/test/unit/wrappers/http/test_sync_http_wrappers.py b/src/test/unit/wrappers/http/test_sync_http_wrappers.py index ee49cff2..47d08d5d 100644 --- a/src/test/unit/wrappers/http/test_sync_http_wrappers.py +++ b/src/test/unit/wrappers/http/test_sync_http_wrappers.py @@ -23,7 +23,7 @@ TRUNCATE_SUFFIX, ) from lumigo_tracer.wrappers.http.http_parser import Parser -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.wrappers.http.http_data_classes import HttpRequest from lumigo_tracer.wrappers.http.sync_http_wrappers import ( add_request_event, diff --git a/src/test/unit/wrappers/http/test_sync_http_wrappers_threads.py b/src/test/unit/wrappers/http/test_sync_http_wrappers_threads.py index 5c4cbaf7..db18d287 100644 --- a/src/test/unit/wrappers/http/test_sync_http_wrappers_threads.py +++ b/src/test/unit/wrappers/http/test_sync_http_wrappers_threads.py @@ -6,7 +6,7 @@ import json import lumigo_tracer -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer COUNT = 5 diff --git a/src/test/unit/wrappers/pymongo/test_pymongo_wrapper.py b/src/test/unit/wrappers/pymongo/test_pymongo_wrapper.py index d3deee32..f8447cd0 100644 --- a/src/test/unit/wrappers/pymongo/test_pymongo_wrapper.py +++ b/src/test/unit/wrappers/pymongo/test_pymongo_wrapper.py @@ -2,7 +2,7 @@ import pytest -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.wrappers.pymongo.pymongo_wrapper import LumigoMongoMonitoring diff --git a/src/test/unit/wrappers/redis/test_redis_wrapper.py b/src/test/unit/wrappers/redis/test_redis_wrapper.py index 5b264710..947cd6be 100644 --- a/src/test/unit/wrappers/redis/test_redis_wrapper.py +++ b/src/test/unit/wrappers/redis/test_redis_wrapper.py @@ -3,7 +3,7 @@ import pytest -from lumigo_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer from lumigo_tracer.wrappers.redis.redis_wrapper import execute_command_wrapper, execute_wrapper FUNCTION_RESULT = "Result" diff --git a/src/test/unit/wrappers/sql/test_sqlalchemy_wrapper.py b/src/test/unit/wrappers/sql/test_sqlalchemy_wrapper.py index d9ff7190..526858bd 100644 --- a/src/test/unit/wrappers/sql/test_sqlalchemy_wrapper.py +++ b/src/test/unit/wrappers/sql/test_sqlalchemy_wrapper.py @@ -7,8 +7,8 @@ from sqlalchemy.sql import select from lumigo_tracer.lumigo_utils import DEFAULT_MAX_ENTRY_SIZE -from lumigo_tracer.spans_container import SpansContainer -from lumigo_tracer.tracer import lumigo_tracer +from lumigo_tracer.lambda_tracer.spans_container import SpansContainer +from lumigo_tracer.lambda_tracer.tracer import lumigo_tracer TOKEN = "t_10faa5e13e7844aaa1234" From 0bdd12de0896b18292f2c01c119ee736be33486d Mon Sep 17 00:00:00 2001 From: saartochner Date: Sun, 22 Jan 2023 11:00:46 +0200 Subject: [PATCH 2/4] move the tracer-specific logic into a well-marked module and don't run code in global time if not lambda --- src/test/component/test_component.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/component/test_component.py b/src/test/component/test_component.py index ba6557d0..99ee196b 100644 --- a/src/test/component/test_component.py +++ b/src/test/component/test_component.py @@ -5,6 +5,7 @@ import subprocess import http.client +from lumigo_tracer import global_scope_exec from lumigo_tracer.lambda_tracer.tracer import lumigo_tracer from lumigo_tracer.lumigo_utils import md5hash from lumigo_tracer.lambda_tracer.spans_container import SpansContainer @@ -31,6 +32,7 @@ def aws_env_variables(monkeypatch, aws_environment): "_X_AMZN_TRACE_ID", "RequestId: 4365921c-fc6d-4745-9f00-9fe9c516ede5 Root=1-000044d4-c3881e0c19c02c5e6ffa8f9e;Parent=37cf579525dfb3ba;Sampled=0", ) + global_scope_exec() @pytest.fixture From bbf9b18f96635e79109dc711942a89661ef98357 Mon Sep 17 00:00:00 2001 From: saartochner Date: Sun, 22 Jan 2023 11:19:18 +0200 Subject: [PATCH 3/4] move the tracer-specific logic into a well-marked module and don't run code in global time if not lambda --- src/lumigo_tracer/lambda_tracer/global_scope_exec.py | 4 ++-- src/test/unit/lambda_tracer/test_global_scope_exec.py | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/lumigo_tracer/lambda_tracer/global_scope_exec.py b/src/lumigo_tracer/lambda_tracer/global_scope_exec.py index 6719142c..4cc17891 100644 --- a/src/lumigo_tracer/lambda_tracer/global_scope_exec.py +++ b/src/lumigo_tracer/lambda_tracer/global_scope_exec.py @@ -1,7 +1,7 @@ from lumigo_tracer import auto_instrument_handler from lumigo_tracer.lambda_tracer import lambda_reporter from lumigo_tracer.lumigo_utils import is_aws_environment -from lumigo_tracer.wrappers import wrap +from lumigo_tracer import wrappers def global_scope_exec() -> None: @@ -11,4 +11,4 @@ def global_scope_exec() -> None: # auto_instrument: import handler during runtime initialization, as usual. auto_instrument_handler.prefetch_handler_import() # follow requests to third party services - wrap() + wrappers.wrap() diff --git a/src/test/unit/lambda_tracer/test_global_scope_exec.py b/src/test/unit/lambda_tracer/test_global_scope_exec.py index 11897bb6..9e8e316b 100644 --- a/src/test/unit/lambda_tracer/test_global_scope_exec.py +++ b/src/test/unit/lambda_tracer/test_global_scope_exec.py @@ -1,18 +1,14 @@ from lumigo_tracer.lambda_tracer.global_scope_exec import global_scope_exec from lumigo_tracer.lambda_tracer import lambda_reporter -from lumigo_tracer import wrappers def test_global_scope_preparation_called_only_in_lambda(monkeypatch): lambda_reporter.edge_connection = None - wrappers.already_wrapped = False monkeypatch.delenv("AWS_LAMBDA_FUNCTION_VERSION", raising=False) global_scope_exec() assert lambda_reporter.edge_connection is None - assert not wrappers.already_wrapped monkeypatch.setenv("AWS_LAMBDA_FUNCTION_VERSION", "true") global_scope_exec() assert lambda_reporter.edge_connection is not None - assert wrappers.already_wrapped From df455de59a7e46708cce3093267e8b9ed7a150fb Mon Sep 17 00:00:00 2001 From: saartochner Date: Sun, 22 Jan 2023 12:15:31 +0200 Subject: [PATCH 4/4] support ReceiveMessage trigger in sqs parser --- .secrets.baseline | 6 +-- .../event/trigger_parsing/sqs_parser.py | 22 ++++++--- src/test/unit/event/test_event_trigger.py | 45 +++++++++++++++++++ 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 35cbefeb..a39ae665 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -168,14 +168,14 @@ "filename": "src/test/unit/event/test_event_trigger.py", "hashed_secret": "885bb9903f72e004ff2974807b70e7c970d3e6d5", "is_verified": false, - "line_number": 567 + "line_number": 612 }, { "type": "Hex High Entropy String", "filename": "src/test/unit/event/test_event_trigger.py", "hashed_secret": "3fae06dc55a618caed1d794dcd512bfe7e76c9f1", "is_verified": false, - "line_number": 612 + "line_number": 657 } ], "src/test/unit/test_lumigo_utils.py": [ @@ -220,5 +220,5 @@ } ] }, - "generated_at": "2023-01-22T08:40:16Z" + "generated_at": "2023-01-22T10:15:06Z" } diff --git a/src/lumigo_tracer/event/trigger_parsing/sqs_parser.py b/src/lumigo_tracer/event/trigger_parsing/sqs_parser.py index a836112a..8427eb2d 100644 --- a/src/lumigo_tracer/event/trigger_parsing/sqs_parser.py +++ b/src/lumigo_tracer/event/trigger_parsing/sqs_parser.py @@ -10,32 +10,40 @@ class SqsEventTriggerParser(EventTriggerParser): @staticmethod def _should_handle(event: Dict[Any, Any]) -> bool: - return bool(event.get("Records", [{}])[0].get("eventSource") == "aws:sqs") + return bool(event.get("Records", [{}])[0].get("eventSource") == "aws:sqs") or bool( + event.get("service_name") == "sqs" and event.get("operation_name") == "ReceiveMessage" + ) + + @staticmethod + def _get_messages(event: Dict[Any, Any]) -> List[Dict[Any, Any]]: + return event.get("Records", []) + event.get("Messages", []) # type: ignore @staticmethod def handle(event: Dict[Any, Any], target_id: Optional[str]) -> TriggerType: + messages = SqsEventTriggerParser._get_messages(event) message_ids = [] - for record in event.get("Records", []): - record_message_id = record.get("messageId") + for record in messages: + record_message_id = record.get("messageId") or record.get("MessageId") if not record_message_id: continue message_ids.append(record_message_id) + arn = event.get("Records", [{}])[0].get("eventSourceARN") or "Unknown" return EventTriggerParser.build_trigger( target_id=target_id, resource_type="sqs", from_message_ids=message_ids, extra={ - ExtraKeys.ARN: event["Records"][0]["eventSourceARN"], - ExtraKeys.RECORDS_NUM: len(event["Records"]), + ExtraKeys.ARN: arn, + ExtraKeys.RECORDS_NUM: len(messages), }, ) @staticmethod def extract_inner(event: Dict[Any, Any]) -> List[str]: inner_messages = [] - for record in event.get("Records", []): - body = record.get("body") + for record in SqsEventTriggerParser._get_messages(event): + body = record.get("body") or record.get("Body") if isinstance(body, str): inner_messages.append(body) return inner_messages diff --git a/src/test/unit/event/test_event_trigger.py b/src/test/unit/event/test_event_trigger.py index 290959ed..04e8082d 100644 --- a/src/test/unit/event/test_event_trigger.py +++ b/src/test/unit/event/test_event_trigger.py @@ -238,6 +238,51 @@ }, ], ), + ( # SNS-SQS example trigger (ReceiveMessage) + { + "service_name": "sqs", + "operation_name": "ReceiveMessage", + "Messages": [ + { + "MessageId": "aaaa-aaaa-aaaa-aaaa", + "ReceiptHandle": "ReceiptHandle", + "MD5OfBody": "123456789", + "Body": '{\n "Type" : "Notification",\n "MessageId" : "bbbb-bbbb-bbbb-bbbb",\n "TopicArn" : "arn:aws:sns:us-west-2:1234567891011:inner-sns",\n "Message" : "{}",\n "Timestamp" : "2023-01-22T09:43:08.651Z",\n "SignatureVersion" : "1",\n "Signature" : "Signature",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-123456789.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:1234567891011:inner-sns-bbb-ccc"\n}', + } + ], + "ResponseMetadata": { + "RequestId": "RequestId", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amzn-requestid": "x-amzn-requestid", + "date": "Sun, 22 Jan 2023 09:44:25 GMT", + "content-type": "text/xml", + "content-length": "1998", + }, + "RetryAttempts": 0, + }, + }, + [ + { + "extra": { + "arn": "Unknown", + "recordsNum": 1, + }, + "fromMessageIds": [ + "aaaa-aaaa-aaaa-aaaa", + ], + "triggeredBy": "sqs", + }, + { + "extra": { + "arn": "arn:aws:sns:us-west-2:1234567891011:inner-sns", + "recordsNum": 1, + }, + "fromMessageIds": ["bbbb-bbbb-bbbb-bbbb"], + "triggeredBy": "sns", + }, + ], + ), ( # SQS that is *not* SNS-SQS (not SimpleNotificationService) { "Records": [