From 0be7006aed2f627d56a63a9e101f08f176f674be Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 7 May 2024 16:09:19 +0300
Subject: [PATCH] feat: add support for kinesis using span links
---
.../instrumentation/aws_lambda/__init__.py | 51 +++++++++++++++++++
.../instrumentation/botocore/__init__.py | 28 +++++-----
2 files changed, 64 insertions(+), 15 deletions(-)
diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py
index b8d0ccf72c..a6a4ece9b9 100644
--- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py
+++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py
@@ -105,6 +105,7 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.trace.span import INVALID_SPAN_ID
import json
import typing
+import base64
#import traceback
#import tracemalloc
@@ -481,6 +482,43 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception as ex:
pass
+ kinesisTriggerSpan = None
+ try:
+ if lambda_event["Records"][0]["eventSource"] == "aws:kinesis":
+ links = []
+
+ for record in lambda_event["Records"]:
+ if record.get("kinesis") is None:
+ continue
+ data = record["kinesis"].get("data")
+ if data is not None:
+ decoded_bytes = base64.b64decode(data)
+ decoded_string = decoded_bytes.decode('utf-8')
+ data = json.loads(decoded_string)
+ ctx = get_global_textmap().extract(carrier=data.get("_context"))
+ span_ctx = get_current_span(ctx).get_span_context()
+ if span_ctx.span_id != INVALID_SPAN_ID:
+ links.append(Link(span_ctx))
+ span_kind = SpanKind.INTERNAL
+ span_name = orig_handler_name
+ kinesisTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
+ kinesisTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
+ kinesisTriggerSpan.set_attribute("faas.trigger.type", "Kinesis")
+
+ parent_context = set_span_in_context(kinesisTriggerSpan)
+
+ if lambda_event["Records"][0]["kinesis"] and lambda_event["Records"][0]["kinesis"].get("data"):
+ decoded_bytes = base64.b64decode(lambda_event["Records"][0]["kinesis"].get("data"))
+ decoded_string = decoded_bytes.decode('utf-8')
+ data = json.loads(decoded_string)
+
+ kinesisTriggerSpan.set_attribute(
+ "rpc.request.body",
+ limit_string_size(data),
+ )
+ except Exception as e:
+ pass
+
dynamoTriggerSpan = None
try:
if lambda_event["Records"][0]["eventSource"] == "aws:dynamodb":
@@ -677,6 +715,17 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
pass
snsTriggerSpan.end()
+ if lambda_event and kinesisTriggerSpan is not None:
+ try:
+ if isinstance(result, dict) and result.get("ResponseMetadata"):
+ if result["ResponseMetadata"].get("HTTPStatusCode"):
+ kinesisTriggerSpan.set_attribute(
+ SpanAttributes.HTTP_STATUS_CODE,
+ result["ResponseMetadata"]["HTTPStatusCode"],
+ )
+ except Exception:
+ pass
+ kinesisTriggerSpan.end()
if lambda_event and dynamoTriggerSpan is not None:
try:
@@ -754,6 +803,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
cognitoTriggerSpan.end()
if eventBridgeTriggerSpan is not None:
eventBridgeTriggerSpan.end()
+ if kinesisTriggerSpan is not None:
+ kinesisTriggerSpan.end()
now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py
index 9f1c0ce67e..f409ab2d6e 100644
--- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py
+++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py
@@ -168,15 +168,6 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return original_func(*args, **kwargs)
- #print("patched_api")
- #print(json.dumps(instance, indent=4, sort_keys=True, default=str))
- #print("args")
- #print(args)
- #print(json.dumps(args, indent=4, sort_keys=True, default=str))
- #print("kwargs")
- #print(kwargs)
- #print(json.dumps(kwargs, indent=4, sort_keys=True, default=str))
-
call_context = _determine_call_context(instance, args)
if call_context is None:
return original_func(*args, **kwargs)
@@ -215,14 +206,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
elif call_context.service == "events" and call_context.operation == "PutEvents":
call_context.span_kind = SpanKind.PRODUCER
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
+ elif call_context.service == "kinesis" and (call_context.operation == "PutRecord" or call_context.operation == "PutRecords"):
+ call_context.span_kind = SpanKind.PRODUCER
+ attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
else:
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
except Exception as ex:
pass
- #print("here")
- #print(call_context.operation)
- #print(json.dumps(call_context.params, indent=4, sort_keys=True, default=str))
_safe_invoke(extension.extract_attributes, attributes)
with self._tracer.start_as_current_span(
@@ -289,11 +280,18 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
except Exception as ex:
pass
- result = None
try:
- #print("calling original func")
- #print(json.dumps(args, indent=4, sort_keys=True, default=str))
+ if call_context.service == "kinesis" and call_context.operation == "PutRecord":
+ if args[1].get("Data") is not None:
+ detailJson = json.loads(args[1].get("Data"))
+ detailJson['_context'] = {}
+ inject(carrier = detailJson['_context'])
+ args[1]["Data"] = json.dumps(detailJson)
+ except Exception as e:
+ pass
+ result = None
+ try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)