Skip to content

Commit

Permalink
Parse enhanced metrics from Lambda telemetry JSON logs.
Browse files Browse the repository at this point in the history
These get emitted instead of regular REPORT logs if log format is set to JSON.
  • Loading branch information
dtcaciuc-ggs committed Oct 22, 2024
1 parent 4070f8b commit a51e969
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 1 deletion.
90 changes: 89 additions & 1 deletion aws/logs_monitoring/enhanced_lambda_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import json
import os
import logging
import re
Expand Down Expand Up @@ -202,8 +203,12 @@ def generate_enhanced_lambda_metrics(log, tags_cache):
if not is_lambda_log:
return []

# Check if its Lambda lifecycle log that is emitted if log format is set to JSON
parsed_metrics = parse_metrics_from_json_report_log(log_message)

# Check if this is a REPORT log
parsed_metrics = parse_metrics_from_report_log(log_message)
if not parsed_metrics:
parsed_metrics = parse_metrics_from_report_log(log_message)

# Check if this is a timeout
if not parsed_metrics:
Expand Down Expand Up @@ -254,6 +259,89 @@ def parse_lambda_tags_from_arn(arn):
]


MEMORY_ALLOCATED_RECORD_KEY = "memorySizeMB"
INIT_DURATION_RECORD_KEY = "initDurationMs"
BILLED_DURATION_RECORD_KEY = "billedDurationMs"
RUNTIME_METRICS_BY_RECORD_KEY = {
# Except INIT_DURATION_RECORD_KEY which is handled separately
"durationMs": DURATION_METRIC_NAME,
BILLED_DURATION_RECORD_KEY: BILLED_DURATION_METRIC_NAME,
"maxMemoryUsedMB": MAX_MEMORY_USED_METRIC_NAME,
}


def parse_metrics_from_json_report_log(log_message):
if not log_message.startswith("{"):
return []

try:
body = json.loads(log_message)
except json.JSONDecodeError:
return []

stage = body.get("type", "")
record = body.get("record", {})
record_metrics = record.get("metrics", {})

if stage != "platform.report" or not record_metrics:
return []

metrics = []

for record_key, metric_name in RUNTIME_METRICS_BY_RECORD_KEY.items():
metric_point_value = record_metrics[record_key]

if metric_name in METRIC_ADJUSTMENT_FACTORS:
metric_point_value *= METRIC_ADJUSTMENT_FACTORS[metric_name]

metrics.append(
DatadogMetricPoint(
f"{ENHANCED_METRICS_NAMESPACE_PREFIX}.{metric_name}",
metric_point_value,
)
)

tags = [
f"{MEMORY_ALLOCATED_FIELD_NAME}:{record_metrics[MEMORY_ALLOCATED_RECORD_KEY]}"
]

try:
init_duration = record_metrics[INIT_DURATION_RECORD_KEY]
except KeyError:
tags.append("cold_start:false")
else:
tags.append("cold_start:true")
metrics.append(
DatadogMetricPoint(
f"{ENHANCED_METRICS_NAMESPACE_PREFIX}.{INIT_DURATION_METRIC_NAME}",
init_duration * METRIC_ADJUSTMENT_FACTORS[INIT_DURATION_METRIC_NAME],
)
)

metrics.append(
DatadogMetricPoint(
f"{ENHANCED_METRICS_NAMESPACE_PREFIX}.{ESTIMATED_COST_METRIC_NAME}",
calculate_estimated_cost(
record_metrics[BILLED_DURATION_RECORD_KEY],
record_metrics[MEMORY_ALLOCATED_RECORD_KEY],
),
)
)

if record["status"] == "timeout":
metrics.append(
DatadogMetricPoint(
f"{ENHANCED_METRICS_NAMESPACE_PREFIX}.{TIMEOUTS_METRIC_NAME}",
1.0,
)
)

for metric in metrics:
metric.add_tags(tags)

return metrics


def parse_metrics_from_report_log(report_log_line):
"""Parses and returns metrics from the REPORT Lambda log
Expand Down
111 changes: 111 additions & 0 deletions aws/logs_monitoring/tests/test_enhanced_lambda_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import unittest
import os
from time import time
Expand All @@ -7,6 +8,7 @@

from enhanced_lambda_metrics import (
parse_metrics_from_report_log,
parse_metrics_from_json_report_log,
parse_lambda_tags_from_arn,
generate_enhanced_lambda_metrics,
create_out_of_memory_enhanced_metric,
Expand All @@ -32,6 +34,56 @@ class TestEnhancedLambdaMetrics(unittest.TestCase):
"XRAY TraceId: 1-5d83c0ad-b8eb33a0b1de97d804fac890\tSegmentId: 31255c3b19bd3637\t"
"Sampled: true"
)
standard_json_report = json.dumps(
{
"time": "2024-10-04T00:36:35.800Z",
"type": "platform.report",
"record": {
"requestId": "4d789d71-2f2c-4c66-a4b5-531a0223233d",
"metrics": {
"durationMs": 0.62,
"billedDurationMs": 100,
"memorySizeMB": 128,
"maxMemoryUsedMB": 51,
},
"status": "success",
},
}
)
cold_start_json_report = json.dumps(
{
"time": "2024-10-04T00:36:35.800Z",
"type": "platform.report",
"record": {
"requestId": "4d789d71-2f2c-4c66-a4b5-531a0223233d",
"metrics": {
"durationMs": 0.81,
"billedDurationMs": 100,
"memorySizeMB": 128,
"maxMemoryUsedMB": 90,
"initDurationMs": 1234,
},
"status": "success",
},
}
)
timeout_json_report = json.dumps(
{
"time": "2024-10-18T19:38:39.661Z",
"type": "platform.report",
"record": {
"requestId": "b441820a-ebe5-4d5f-93bc-f945707b0225",
"metrics": {
"durationMs": 30000.0,
"billedDurationMs": 30000,
"memorySizeMB": 128,
"maxMemoryUsedMB": 74,
"initDurationMs": 985.413,
},
"status": "timeout",
},
}
)

def test_parse_lambda_tags_from_arn(self):
verify_as_json(
Expand All @@ -57,6 +109,24 @@ def test_parse_metrics_from_report_with_xray(self):
parsed_metrics = parse_metrics_from_report_log(self.report_with_xray)
verify_as_json(parsed_metrics)

def test_parse_metrics_from_json_no_report(self):
# Ensure we ignore unrelated JSON logs
parsed_metrics = parse_metrics_from_json_report_log('{"message": "abcd"}')
assert parsed_metrics == []

def test_parse_metrics_from_json_report_log(self):
parsed_metrics = parse_metrics_from_json_report_log(self.standard_json_report)
# The timestamps are None because the timestamp is added after the metrics are parsed
verify_as_json(parsed_metrics)

def test_parse_metrics_from_cold_start_json_report_log(self):
parsed_metrics = parse_metrics_from_json_report_log(self.cold_start_json_report)
verify_as_json(parsed_metrics)

def test_parse_metrics_from_timeout_json_report_log(self):
parsed_metrics = parse_metrics_from_json_report_log(self.timeout_json_report)
verify_as_json(parsed_metrics)

def test_create_out_of_memory_enhanced_metric(self):
go_out_of_memory_error = "fatal error: runtime: out of memory"
self.assertEqual(
Expand Down Expand Up @@ -88,6 +158,47 @@ def test_create_out_of_memory_enhanced_metric(self):
success_message = "Success!"
self.assertEqual(len(create_out_of_memory_enhanced_metric(success_message)), 0)

def test_generate_enhanced_lambda_metrics_json(
self,
):
tags_cache = LambdaTagsCache("")
tags_cache.get = MagicMock(return_value=[])

logs_input = {
"message": json.dumps(
{
"time": "2024-10-04T00:36:35.800Z",
"type": "platform.report",
"record": {
"requestId": "4d789d71-2f2c-4c66-a4b5-531a0223233d",
"metrics": {
"durationMs": 3470.65,
"billedDurationMs": 3500,
"memorySizeMB": 128,
"maxMemoryUsedMB": 89,
},
"status": "success",
},
}
),
"aws": {
"awslogs": {
"logGroup": "/aws/lambda/post-coupon-prod-us",
"logStream": "2019/09/25/[$LATEST]d6c10ebbd9cb48dba94a7d9b874b49bb",
"owner": "172597598159",
},
"function_version": "$LATEST",
"invoked_function_arn": "arn:aws:lambda:us-east-1:172597598159:function:collect_logs_datadog_demo",
},
"lambda": {
"arn": "arn:aws:lambda:us-east-1:172597598159:function:post-coupon-prod-us"
},
"timestamp": 10000,
}

generated_metrics = generate_enhanced_lambda_metrics(logs_input, tags_cache)
verify_as_json(generated_metrics)

def test_generate_enhanced_lambda_metrics(self):
tags_cache = LambdaTagsCache("")
tags_cache.get = MagicMock(return_value=[])
Expand Down

0 comments on commit a51e969

Please sign in to comment.