Skip to content

Commit

Permalink
feat: add support for kinesis using span links (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv authored May 8, 2024
1 parent 5e82f2d commit 56cefa3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.trace.span import INVALID_SPAN_ID
import json
import typing
import base64
#import traceback
#import tracemalloc

Expand Down Expand Up @@ -481,6 +482,43 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception as ex:
pass

kinesisTriggerSpan = None
try:
if lambda_event["Records"][0]["eventSource"] == "aws:kinesis":
links = []

for record in lambda_event["Records"]:
if record.get("kinesis") is None:
continue
data = record["kinesis"].get("data")
if data is not None:
decoded_bytes = base64.b64decode(data)
decoded_string = decoded_bytes.decode('utf-8')
data = json.loads(decoded_string)
ctx = get_global_textmap().extract(carrier=data.get("_context"))
span_ctx = get_current_span(ctx).get_span_context()
if span_ctx.span_id != INVALID_SPAN_ID:
links.append(Link(span_ctx))
span_kind = SpanKind.INTERNAL
span_name = orig_handler_name
kinesisTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
kinesisTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
kinesisTriggerSpan.set_attribute("faas.trigger.type", "Kinesis")

parent_context = set_span_in_context(kinesisTriggerSpan)

if lambda_event["Records"][0]["kinesis"] and lambda_event["Records"][0]["kinesis"].get("data"):
decoded_bytes = base64.b64decode(lambda_event["Records"][0]["kinesis"].get("data"))
decoded_string = decoded_bytes.decode('utf-8')
data = json.loads(decoded_string)

kinesisTriggerSpan.set_attribute(
"rpc.request.body",
limit_string_size(data),
)
except Exception as e:
pass

dynamoTriggerSpan = None
try:
if lambda_event["Records"][0]["eventSource"] == "aws:dynamodb":
Expand Down Expand Up @@ -677,6 +715,17 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
pass
snsTriggerSpan.end()

if lambda_event and kinesisTriggerSpan is not None:
try:
if isinstance(result, dict) and result.get("ResponseMetadata"):
if result["ResponseMetadata"].get("HTTPStatusCode"):
kinesisTriggerSpan.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
result["ResponseMetadata"]["HTTPStatusCode"],
)
except Exception:
pass
kinesisTriggerSpan.end()

if lambda_event and dynamoTriggerSpan is not None:
try:
Expand Down Expand Up @@ -754,6 +803,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
cognitoTriggerSpan.end()
if eventBridgeTriggerSpan is not None:
eventBridgeTriggerSpan.end()
if kinesisTriggerSpan is not None:
kinesisTriggerSpan.end()

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return original_func(*args, **kwargs)

#print("patched_api")
#print(json.dumps(instance, indent=4, sort_keys=True, default=str))
#print("args")
#print(args)
#print(json.dumps(args, indent=4, sort_keys=True, default=str))
#print("kwargs")
#print(kwargs)
#print(json.dumps(kwargs, indent=4, sort_keys=True, default=str))

call_context = _determine_call_context(instance, args)
if call_context is None:
return original_func(*args, **kwargs)
Expand Down Expand Up @@ -215,14 +206,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
elif call_context.service == "events" and call_context.operation == "PutEvents":
call_context.span_kind = SpanKind.PRODUCER
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
elif call_context.service == "kinesis" and (call_context.operation == "PutRecord" or call_context.operation == "PutRecords"):
call_context.span_kind = SpanKind.PRODUCER
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
else:
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
except Exception as ex:
pass

#print("here")
#print(call_context.operation)
#print(json.dumps(call_context.params, indent=4, sort_keys=True, default=str))
_safe_invoke(extension.extract_attributes, attributes)

with self._tracer.start_as_current_span(
Expand Down Expand Up @@ -289,11 +280,18 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
except Exception as ex:
pass

result = None
try:
#print("calling original func")
#print(json.dumps(args, indent=4, sort_keys=True, default=str))
if call_context.service == "kinesis" and call_context.operation == "PutRecord":
if args[1].get("Data") is not None:
detailJson = json.loads(args[1].get("Data"))
detailJson['_context'] = {}
inject(carrier = detailJson['_context'])
args[1]["Data"] = json.dumps(detailJson)
except Exception as e:
pass

result = None
try:
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
Expand Down

0 comments on commit 56cefa3

Please sign in to comment.