Skip to content

Commit

Permalink
feat: add support for payload sizelimiting for aws-lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Oct 19, 2023
1 parent df8a761 commit d918671
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.context.context import Context
from opentelemetry.instrumentation.aws_lambda.package import _instruments
from opentelemetry.instrumentation.aws_lambda.version import __version__
from opentelemetry.instrumentation.aws_lambda.utils import limit_string_size
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import MeterProvider, get_meter_provider
Expand Down Expand Up @@ -234,7 +235,7 @@ def _set_api_gateway_v1_proxy_attributes(
if lambda_event.get("body"):
span.set_attribute(
"http.request.body",
lambda_event.get("body"),
limit_string_size(lambda_event.get("body")),
)

if lambda_event.get("headers"):
Expand Down Expand Up @@ -286,7 +287,7 @@ def _set_api_gateway_v2_proxy_attributes(
if lambda_event.get("body"):
span.set_attribute(
"http.request.body",
lambda_event.get("body"),
limit_string_size(lambda_event.get("body")),
)

span.set_attribute(
Expand Down Expand Up @@ -414,7 +415,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("s3"):
s3TriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["Records"][0].get("s3")),
limit_string_size(json.dumps(lambda_event["Records"][0].get("s3"))),
)
except Exception as ex:
pass
Expand All @@ -441,7 +442,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("body"):
sqsTriggerSpan.set_attribute(
"rpc.request.body",
lambda_event["Records"][0].get("body"),
limit_string_size(lambda_event["Records"][0].get("body")),
)

except Exception as ex:
Expand All @@ -461,7 +462,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0]["Sns"] and lambda_event["Records"][0]["Sns"].get("Message"):
snsTriggerSpan.set_attribute(
"rpc.request.body",
lambda_event["Records"][0]["Sns"].get("Message"),
limit_string_size(lambda_event["Records"][0]["Sns"].get("Message")),
)
except Exception as ex:
pass
Expand All @@ -482,7 +483,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("dynamodb"):
dynamoTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["Records"][0].get("dynamodb")),
limit_string_size(json.dumps(lambda_event["Records"][0].get("dynamodb"))),
)
except Exception as ex:
pass
Expand All @@ -503,7 +504,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["datasetRecords"]:
cognitoTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["datasetRecords"]),
limit_string_size(json.dumps(lambda_event["datasetRecords"])),
)
except Exception as ex:
pass
Expand All @@ -528,7 +529,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches

eventBridgeTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event),
limit_string_size(json.dumps(lambda_event)),
)
except Exception as ex:
pass
Expand Down Expand Up @@ -578,7 +579,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
apiGwSpan.set_attribute(
"http.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
if lambda_event.get("headers"):
for key, value in lambda_event.get("headers").items():
Expand All @@ -594,10 +595,13 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0]["eventSource"] == "aws:sqs":
span.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
span.set_attribute("messaging.message",
lambda_event["Records"])
except Exception:
limit_string_size(lambda_event["Records"]))
except Exception as ex:
#print(traceback.format_exc())
#print("exception")
#print(ex)
pass
except Exception:
except Exception as ex:
# TODO check why we get exception
#print(traceback.format_exc())
#print("exception")
Expand All @@ -618,7 +622,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
s3TriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand All @@ -635,7 +639,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
sqsTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand All @@ -651,7 +655,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
snsTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand All @@ -668,7 +672,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
dynamoTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand All @@ -684,7 +688,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
cognitoTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand All @@ -700,7 +704,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
eventBridgeTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(result.get("body")),
)
except Exception:
pass
Expand Down Expand Up @@ -866,4 +870,4 @@ def keys(
self, carrier: typing.Mapping[str, textmap.CarrierValT]
) -> typing.List[str]:
"""Keys implementation that returns all keys from a dictionary."""
return list(carrier.keys())
return list(carrier.keys())
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def response_hook(span, service_name, operation_name, result):
import json
import io
import os
from opentelemetry.instrumentation.botocore.utils import limit_string_size, get_payload_size_limit
from typing import Any, Callable, Collection, Dict, Optional, Tuple

from botocore.client import BaseClient
Expand Down Expand Up @@ -134,7 +135,6 @@ def __init__(self):
super().__init__()
self.request_hook = None
self.response_hook = None
self.payload_size_limit = 51200

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Expand All @@ -147,12 +147,6 @@ def _instrument(self, **kwargs):

self.request_hook = kwargs.get("request_hook")
self.response_hook = kwargs.get("response_hook")
try:
self.payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
except ValueError:
logger.error(
"OTEL_PAYLOAD_SIZE_LIMIT is not a number"
)
wrap_function_wrapper(
"botocore.client",
"BaseClient._make_api_call",
Expand Down Expand Up @@ -201,28 +195,28 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if call_context.operation == "ListObjects":
bucket = call_context.params.get("Bucket")
if bucket is not None:
attributes["rpc.request.payload"] = bucket
attributes["rpc.request.payload"] = limit_string_size(bucket)
elif call_context.operation == "PutObject":
body = call_context.params.get("Body")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, body.decode('ascii'))
attributes["rpc.request.payload"] = limit_string_size(body.decode('ascii'))
elif call_context.operation == "PutItem":
body = call_context.params.get("Item")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(body, default=str))
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
elif call_context.operation == "GetItem":
body = call_context.params.get("Key")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
elif call_context.operation == "Publish":
body = call_context.params.get("Message")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
attributes["rpc.request.payload"] = limit_string_size(json.dumps(body, default=str))
elif call_context.service == "events" and call_context.operation == "PutEvents":
call_context.span_kind = SpanKind.PRODUCER
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
else:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
except Exception as ex:
pass

Expand Down Expand Up @@ -303,11 +297,11 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
result = original_func(*args, **kwargs)
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result, self.payload_size_limit)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
else:
_apply_response_attributes(span, result, self.payload_size_limit)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
finally:
context_api.detach(token)
Expand Down Expand Up @@ -337,7 +331,7 @@ def _call_response_hook(
)


def _apply_response_attributes(span: Span, result, payload_size_limit):
def _apply_response_attributes(span: Span, result):
if result is None or not span.is_recording():
return

Expand Down Expand Up @@ -380,15 +374,15 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
body = result.get("Body")
if buckets is not None:
span.set_attribute(
"rpc.response.payload", json.dumps([b.get("Name") for b in buckets]))
"rpc.response.payload", limit_string_size(json.dumps([b.get("Name") for b in buckets])))
elif content is not None:
span.set_attribute(
"rpc.response.payload", json.dumps([b.get("Key") for b in content]))
"rpc.response.payload", limit_string_size(json.dumps([b.get("Key") for b in content])))
elif body is not None:
pass
else:
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
#elif body is not None:
# try:
# d = {x: result[x] for x in result if x != "Body"}
Expand All @@ -406,20 +400,20 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
# except Exception as ex:
# pass
# Lambda Invoke
elif result.get("Payload") is not None and result.get("Payload")._content_length is not None and int(result.get("Payload")._content_length) < payload_size_limit:
elif result.get("Payload") is not None and result.get("Payload")._content_length is not None and int(result.get("Payload")._content_length) < get_payload_size_limit():
length = result.get("Payload")._content_length
strbody = result.get("Payload").read()
result.get("Payload").close()
span.set_attribute(
"rpc.response.payload", strbody)
"rpc.response.payload", limit_string_size(strbody))
result['Payload'] = StreamingBody(io.BytesIO(strbody), content_length=length)
# DynamoDB get item
elif server == "Server":
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
else:
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(json.dumps(result, default=str)))
except Exception as ex:
pass

Expand Down Expand Up @@ -471,10 +465,3 @@ def set(
"""
val = {"DataType": "String", "StringValue": value}
carrier[key] = val

def limit_string_size(max_size: int, s: str) -> str:
if len(s) > max_size:
return s[:max_size]
else:
return s

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import importlib
import logging

from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
_AwsSdkExtension,
Expand Down Expand Up @@ -49,4 +48,4 @@ def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
return extension_cls(call_context)
except Exception as ex: # pylint: disable=broad-except
_logger.error("Error when loading extension: %s", ex)
return _AwsSdkExtension(call_context)
return _AwsSdkExtension(call_context)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import json
import re
from typing import Dict
from opentelemetry.instrumentation.botocore.utils import limit_string_size
import os

from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
Expand Down Expand Up @@ -67,7 +69,7 @@ def extract_attributes(
] = cls._parse_function_name(call_context)
attributes[SpanAttributes.FAAS_INVOKED_REGION] = call_context.region
if call_context.params.get("Payload") is not None:
attributes["rpc.request.body"] = call_context.params.get("Payload")
attributes["rpc.request.body"] = limit_string_size(call_context.params.get("Payload"))

@classmethod
def _parse_function_name(cls, call_context: _AwsSdkCallContext):
Expand Down Expand Up @@ -124,4 +126,4 @@ def before_service_call(self, span: Span):
if self._op is None:
return

self._op.before_service_call(self._call_context, span)
self._op.before_service_call(self._call_context, span)
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
_BotoResultT,
)
from opentelemetry.instrumentation.botocore.utils import limit_string_size
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span

Expand All @@ -28,6 +28,7 @@

class _SqsExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):

queue_url = self._call_context.params.get("QueueUrl")
if queue_url:
# TODO: update when semantic conventions exist
Expand All @@ -36,7 +37,7 @@ def extract_attributes(self, attributes: _AttributeMapT):
attributes[SpanAttributes.MESSAGING_URL] = queue_url
payload = self._call_context.params.get("MessageBody")
if payload is not None:
attributes["rpc.request.payload"] = payload
attributes["rpc.request.payload"] = limit_string_size(payload)

try:
attributes[
Expand Down
Loading

0 comments on commit d918671

Please sign in to comment.