Skip to content

Commit

Permalink
feat: add support for payload sizelimiting for aws-lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Oct 19, 2023
1 parent df8a761 commit 2276a86
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def _determine_parent_context(


def _set_api_gateway_v1_proxy_attributes(
lambda_event: Any, span: Span
lambda_event: Any, span: Span, payload_size_limit: int
) -> Span:
"""Sets HTTP attributes for REST APIs and v1 HTTP APIs
Expand All @@ -234,7 +234,7 @@ def _set_api_gateway_v1_proxy_attributes(
if lambda_event.get("body"):
span.set_attribute(
"http.request.body",
lambda_event.get("body"),
limit_string_size(payload_size_limit, lambda_event.get("body")),
)

if lambda_event.get("headers"):
Expand Down Expand Up @@ -270,7 +270,7 @@ def _set_api_gateway_v1_proxy_attributes(


def _set_api_gateway_v2_proxy_attributes(
lambda_event: Any, span: Span
lambda_event: Any, span: Span, payload_size_limit: int
) -> Span:
"""Sets HTTP attributes for v2 HTTP APIs
Expand All @@ -286,7 +286,7 @@ def _set_api_gateway_v2_proxy_attributes(
if lambda_event.get("body"):
span.set_attribute(
"http.request.body",
lambda_event.get("body"),
limit_string_size(payload_size_limit, lambda_event.get("body")),
)

span.set_attribute(
Expand Down Expand Up @@ -340,6 +340,14 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
[wrapped_module_name, wrapped_function_name]
)

payload_size_limit = 51200
try:
payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
except ValueError:
logger.error(
"OTEL_PAYLOAD_SIZE_LIMIT is not a number"
)

lambda_event = args[0]

parent_context = _determine_parent_context(
Expand Down Expand Up @@ -414,7 +422,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("s3"):
s3TriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["Records"][0].get("s3")),
limit_string_size(payload_size_limit, json.dumps(lambda_event["Records"][0].get("s3"))),
)
except Exception as ex:
pass
Expand All @@ -441,7 +449,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("body"):
sqsTriggerSpan.set_attribute(
"rpc.request.body",
lambda_event["Records"][0].get("body"),
limit_string_size(payload_size_limit, lambda_event["Records"][0].get("body")),
)

except Exception as ex:
Expand All @@ -461,7 +469,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0]["Sns"] and lambda_event["Records"][0]["Sns"].get("Message"):
snsTriggerSpan.set_attribute(
"rpc.request.body",
lambda_event["Records"][0]["Sns"].get("Message"),
limit_string_size(payload_size_limit, lambda_event["Records"][0]["Sns"].get("Message")),
)
except Exception as ex:
pass
Expand All @@ -482,7 +490,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0].get("dynamodb"):
dynamoTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["Records"][0].get("dynamodb")),
limit_string_size(payload_size_limit, json.dumps(lambda_event["Records"][0].get("dynamodb"))),
)
except Exception as ex:
pass
Expand All @@ -503,7 +511,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["datasetRecords"]:
cognitoTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event["datasetRecords"]),
limit_string_size(payload_size_limit, json.dumps(lambda_event["datasetRecords"])),
)
except Exception as ex:
pass
Expand All @@ -528,7 +536,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches

eventBridgeTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event),
limit_string_size(payload_size_limit, json.dumps(lambda_event)),
)
except Exception as ex:
pass
Expand Down Expand Up @@ -566,9 +574,9 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
apiGwSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "http")

if lambda_event.get("version") == "2.0":
_set_api_gateway_v2_proxy_attributes(lambda_event, apiGwSpan)
_set_api_gateway_v2_proxy_attributes(lambda_event, apiGwSpan, payload_size_limit)
else:
_set_api_gateway_v1_proxy_attributes(lambda_event, apiGwSpan)
_set_api_gateway_v1_proxy_attributes(lambda_event, apiGwSpan, payload_size_limit)

if isinstance(result, dict) and result.get("statusCode"):
apiGwSpan.set_attribute(
Expand All @@ -578,7 +586,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
apiGwSpan.set_attribute(
"http.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
if lambda_event.get("headers"):
for key, value in lambda_event.get("headers").items():
Expand All @@ -594,7 +602,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if lambda_event["Records"][0]["eventSource"] == "aws:sqs":
span.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
span.set_attribute("messaging.message",
lambda_event["Records"])
limit_string_size(payload_size_limit, lambda_event["Records"]))
except Exception:
pass
except Exception:
Expand All @@ -618,7 +626,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
s3TriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand All @@ -635,7 +643,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
sqsTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand All @@ -651,7 +659,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
snsTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand All @@ -668,7 +676,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
dynamoTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand All @@ -684,7 +692,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
cognitoTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand All @@ -700,7 +708,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
if isinstance(result, dict) and result.get("body"):
eventBridgeTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
limit_string_size(payload_size_limit, result.get("body")),
)
except Exception:
pass
Expand Down Expand Up @@ -867,3 +875,10 @@ def keys(
) -> typing.List[str]:
"""Keys implementation that returns all keys from a dictionary."""
return list(carrier.keys())

def limit_string_size(max_size: int, s: str) -> str:
if len(s) > max_size:
return s[:max_size]
else:
return s

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if call_context.operation == "ListObjects":
bucket = call_context.params.get("Bucket")
if bucket is not None:
attributes["rpc.request.payload"] = bucket
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, bucket)
elif call_context.operation == "PutObject":
body = call_context.params.get("Body")
if body is not None:
Expand Down Expand Up @@ -380,15 +380,15 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
body = result.get("Body")
if buckets is not None:
span.set_attribute(
"rpc.response.payload", json.dumps([b.get("Name") for b in buckets]))
"rpc.response.payload", limit_string_size(payload_size_limit, json.dumps([b.get("Name") for b in buckets])))
elif content is not None:
span.set_attribute(
"rpc.response.payload", json.dumps([b.get("Key") for b in content]))
"rpc.response.payload", limit_string_size(payload_size_limit, json.dumps([b.get("Key") for b in content])))
elif body is not None:
pass
else:
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(payload_size_limit, json.dumps(result, default=str)))
#elif body is not None:
# try:
# d = {x: result[x] for x in result if x != "Body"}
Expand All @@ -411,15 +411,15 @@ def _apply_response_attributes(span: Span, result, payload_size_limit):
strbody = result.get("Payload").read()
result.get("Payload").close()
span.set_attribute(
"rpc.response.payload", strbody)
"rpc.response.payload", limit_string_size(payload_size_limit, strbody))
result['Payload'] = StreamingBody(io.BytesIO(strbody), content_length=length)
# DynamoDB get item
elif server == "Server":
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(payload_size_limit, json.dumps(result, default=str)))
else:
span.set_attribute(
"rpc.response.payload", json.dumps(result, default=str))
"rpc.response.payload", limit_string_size(payload_size_limit, json.dumps(result, default=str)))
except Exception as ex:
pass

Expand Down Expand Up @@ -470,11 +470,4 @@ def set(
value: the value to set
"""
val = {"DataType": "String", "StringValue": value}
carrier[key] = val

def limit_string_size(max_size: int, s: str) -> str:
if len(s) > max_size:
return s[:max_size]
else:
return s

carrier[key] = val
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
except Exception as ex: # pylint: disable=broad-except
_logger.error("Error when loading extension: %s", ex)
return _AwsSdkExtension(call_context)

def limit_string_size(max_size: int, s: str) -> str:
if len(s) > max_size:
return s[:max_size]
else:
return s
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,19 @@ def operation_name(cls):
def extract_attributes(
cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT
):
payload_size_limit = 51200
try:
payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
except ValueError:
pass

attributes[SpanAttributes.FAAS_INVOKED_PROVIDER] = "aws"
attributes[
SpanAttributes.FAAS_INVOKED_NAME
] = cls._parse_function_name(call_context)
attributes[SpanAttributes.FAAS_INVOKED_REGION] = call_context.region
if call_context.params.get("Payload") is not None:
attributes["rpc.request.body"] = call_context.params.get("Payload")
attributes["rpc.request.body"] = limit_string_size(payload_size_limit, call_context.params.get("Payload"))

@classmethod
def _parse_function_name(cls, call_context: _AwsSdkCallContext):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import os
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
Expand All @@ -28,6 +28,12 @@

class _SqsExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
payload_size_limit = 51200
try:
payload_size_limit = int(os.environ.get("OTEL_PAYLOAD_SIZE_LIMIT", 51200))
except ValueError:
pass

queue_url = self._call_context.params.get("QueueUrl")
if queue_url:
# TODO: update when semantic conventions exist
Expand All @@ -36,7 +42,7 @@ def extract_attributes(self, attributes: _AttributeMapT):
attributes[SpanAttributes.MESSAGING_URL] = queue_url
payload = self._call_context.params.get("MessageBody")
if payload is not None:
attributes["rpc.request.payload"] = payload
attributes["rpc.request.payload"] = limit_string_size(payload_size_limit, payload)

try:
attributes[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,8 @@ def after_service_call(self):
Extensions might override this function to do some cleanup tasks.
"""
def limit_string_size(max_size: int, s: str) -> str:
if len(s) > max_size:
return s[:max_size]
else:
return s

0 comments on commit 2276a86

Please sign in to comment.