diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py index b8d0ccf72c..a6a4ece9b9 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -105,6 +105,7 @@ def custom_event_context_extractor(lambda_event): from opentelemetry.trace.span import INVALID_SPAN_ID import json import typing +import base64 #import traceback #import tracemalloc @@ -481,6 +482,43 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches except Exception as ex: pass + kinesisTriggerSpan = None + try: + if lambda_event["Records"][0]["eventSource"] == "aws:kinesis": + links = [] + + for record in lambda_event["Records"]: + if record.get("kinesis") is None: + continue + data = record["kinesis"].get("data") + if data is not None: + decoded_bytes = base64.b64decode(data) + decoded_string = decoded_bytes.decode('utf-8') + data = json.loads(decoded_string) + ctx = get_global_textmap().extract(carrier=data.get("_context")) + span_ctx = get_current_span(ctx).get_span_context() + if span_ctx.span_id != INVALID_SPAN_ID: + links.append(Link(span_ctx)) + span_kind = SpanKind.INTERNAL + span_name = orig_handler_name + kinesisTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links) + kinesisTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub") + kinesisTriggerSpan.set_attribute("faas.trigger.type", "Kinesis") + + parent_context = set_span_in_context(kinesisTriggerSpan) + + if lambda_event["Records"][0]["kinesis"] and lambda_event["Records"][0]["kinesis"].get("data"): + decoded_bytes = base64.b64decode(lambda_event["Records"][0]["kinesis"].get("data")) + decoded_string = decoded_bytes.decode('utf-8') + data = json.loads(decoded_string) + + kinesisTriggerSpan.set_attribute( + "rpc.request.body", + limit_string_size(data), + ) + except Exception as e: + pass + dynamoTriggerSpan = None try: if lambda_event["Records"][0]["eventSource"] == "aws:dynamodb": @@ -677,6 +715,17 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches pass snsTriggerSpan.end() + if lambda_event and kinesisTriggerSpan is not None: + try: + if isinstance(result, dict) and result.get("ResponseMetadata"): + if result["ResponseMetadata"].get("HTTPStatusCode"): + kinesisTriggerSpan.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, + result["ResponseMetadata"]["HTTPStatusCode"], + ) + except Exception: + pass + kinesisTriggerSpan.end() if lambda_event and dynamoTriggerSpan is not None: try: @@ -754,6 +803,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches cognitoTriggerSpan.end() if eventBridgeTriggerSpan is not None: eventBridgeTriggerSpan.end() + if kinesisTriggerSpan is not None: + kinesisTriggerSpan.end() now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 9f1c0ce67e..f409ab2d6e 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -168,15 +168,6 @@ def _patched_api_call(self, original_func, instance, args, kwargs): if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return original_func(*args, **kwargs) - #print("patched_api") - #print(json.dumps(instance, indent=4, sort_keys=True, default=str)) - #print("args") - #print(args) - #print(json.dumps(args, indent=4, sort_keys=True, default=str)) - #print("kwargs") - #print(kwargs) - #print(json.dumps(kwargs, indent=4, sort_keys=True, default=str)) - call_context = _determine_call_context(instance, args) if call_context is None: return original_func(*args, **kwargs) @@ -215,14 +206,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs): elif call_context.service == "events" and call_context.operation == "PutEvents": call_context.span_kind = SpanKind.PRODUCER attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str)) + elif call_context.service == "kinesis" and (call_context.operation == "PutRecord" or call_context.operation == "PutRecords"): + call_context.span_kind = SpanKind.PRODUCER + attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str)) else: attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str)) except Exception as ex: pass - #print("here") - #print(call_context.operation) - #print(json.dumps(call_context.params, indent=4, sort_keys=True, default=str)) _safe_invoke(extension.extract_attributes, attributes) with self._tracer.start_as_current_span( @@ -289,11 +280,18 @@ def _patched_api_call(self, original_func, instance, args, kwargs): except Exception as ex: pass - result = None try: - #print("calling original func") - #print(json.dumps(args, indent=4, sort_keys=True, default=str)) + if call_context.service == "kinesis" and call_context.operation == "PutRecord": + if args[1].get("Data") is not None: + detailJson = json.loads(args[1].get("Data")) + detailJson['_context'] = {} + inject(carrier = detailJson['_context']) + args[1]["Data"] = json.dumps(detailJson) + except Exception as e: + pass + result = None + try: result = original_func(*args, **kwargs) except ClientError as error: result = getattr(error, "response", None)