Skip to content

Commit

Permalink
feat(botocore): patch stepfunctions (#7514)
Browse files Browse the repository at this point in the history
Patch botocore stepfunctions start-execution and start-sync-execution.
We want to be able to add trace context to the `input` so the
stepfunction logs trace reducer can link stepfunction traces to upstream
traces.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
- [x] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
- [x] If this PR touches code that signs or publishes builds or
packages, or handles credentials of any kind, I've requested a review
from `@DataDog/security-design-and-guidance`.
- [x] This PR doesn't touch any of that.

---------

Co-authored-by: Tahir H. Butt <[email protected]>
Co-authored-by: Zachary Groves <[email protected]>
Co-authored-by: Emmett Butler <[email protected]>
  • Loading branch information
4 people authored Jan 12, 2024
1 parent 7c75365 commit f7fa041
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 0 deletions.
16 changes: 16 additions & 0 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from .services.sqs import inject_trace_to_sqs_or_sns_batch_message
from .services.sqs import inject_trace_to_sqs_or_sns_message
from .services.sqs import patched_sqs_api_call
from .services.stepfunctions import inject_trace_to_stepfunction_input
from .services.stepfunctions import patched_stepfunction_api_call
from .utils import inject_trace_to_client_context
from .utils import inject_trace_to_eventbridge_detail
from .utils import set_patched_api_call_span_tags
Expand Down Expand Up @@ -150,6 +152,10 @@ def patched_api_call(original_func, instance, args, kwargs):
kwargs=kwargs,
function_vars=function_vars,
)
elif endpoint_name == "states":
return patched_stepfunction_api_call(
original_func=original_func, instance=instance, args=args, kwargs=kwargs, function_vars=function_vars
)
else:
# this is the default patched api call
return patched_api_call_fallback(
Expand Down Expand Up @@ -220,6 +226,16 @@ def patched_api_call_fallback(original_func, instance, args, kwargs, function_va
cloud_service="sns",
direction=SpanDirection.OUTBOUND,
)
if endpoint_name == "states" and (
operation == "StartExecution" or operation == "StartSyncExecution"
):
inject_trace_to_stepfunction_input(params, span)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="stepfunctions",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

Expand Down
108 changes: 108 additions & 0 deletions ddtrace/contrib/botocore/services/stepfunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import json
from typing import Any # noqa:F401
from typing import Dict # noqa:F401

import botocore.exceptions

from ddtrace import Span # noqa:F401
from ddtrace import config
from ddtrace.ext import http
from ddtrace.propagation.http import HTTPPropagator

from ....ext import SpanTypes
from ....internal.logger import get_logger
from ....internal.schema import SpanDirection
from ....internal.schema import schematize_cloud_messaging_operation
from ....internal.schema import schematize_service_name
from ..utils import set_patched_api_call_span_tags
from ..utils import set_response_metadata_tags


log = get_logger(__name__)


def inject_trace_to_stepfunction_input(params, span):
# type: (Any, Span) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
Inject the trace headers into the StepFunction input if the input is a JSON string
"""
if "input" not in params:
log.warning("Unable to inject context. The StepFunction input had no input.")
return

if params["input"] is None:
log.warning("Unable to inject context. The StepFunction input was None.")
return

elif isinstance(params["input"], dict):
if "_datadog" in params["input"]:
log.warning("Input already has trace context.")
return
params["input"]["_datadog"] = {}
HTTPPropagator.inject(span.context, params["input"]["_datadog"])
return

elif isinstance(params["input"], str):
try:
input_obj = json.loads(params["input"])
except ValueError:
log.warning("Input is not a valid JSON string")
return

if isinstance(input_obj, dict):
input_obj["_datadog"] = {}
HTTPPropagator.inject(span.context, input_obj["_datadog"])
input_json = json.dumps(input_obj)

params["input"] = input_json
return
else:
log.warning("Unable to inject context. The StepFunction input was not a dict.")
return

else:
log.warning("Unable to inject context. The StepFunction input was not a dict or a JSON string.")


def patched_stepfunction_api_call(original_func, instance, args, kwargs: Dict, function_vars: Dict):
params = function_vars.get("params")
trace_operation = function_vars.get("trace_operation")
pin = function_vars.get("pin")
endpoint_name = function_vars.get("endpoint_name")
operation = function_vars.get("operation")

with pin.tracer.trace(
trace_operation,
service=schematize_service_name("{}.{}".format(pin.service, endpoint_name)),
span_type=SpanTypes.HTTP,
) as span:
set_patched_api_call_span_tags(span, instance, args, params, endpoint_name, operation)

if args:
if config.botocore["distributed_tracing"]:
try:
if endpoint_name == "states" and operation in {"StartExecution", "StartSyncExecution"}:
inject_trace_to_stepfunction_input(params, span)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="stepfunctions",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

try:
return original_func(*args, **kwargs)
except botocore.exceptions.ClientError as e:
set_response_metadata_tags(span, e.response)

# If we have a status code, and the status code is not an error,
# then ignore the exception being raised
status_code = span.get_tag(http.STATUS_CODE)
if status_code and not config.botocore.operations[span.resource].is_error_code(int(status_code)):
span._ignore_exception(botocore.exceptions.ClientError)
raise
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
botocore: Add the ability to inject trace context into the input field of botocore stepfunction start_execution and
start_sync_execution calls.
56 changes: 56 additions & 0 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from moto import mock_s3
from moto import mock_sns
from moto import mock_sqs
from moto import mock_stepfunctions
import pytest

from tests.utils import get_128_bit_trace_id_from_headers
Expand Down Expand Up @@ -885,6 +886,61 @@ def test_schematized_unspecified_service_sqs_client_v1(self):
assert spans[2].service == DEFAULT_SPAN_SERVICE_NAME
assert spans[2].name == "aws.sqs.receive"

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="lincoln",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln", input='{"baz":1}'
)
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln")

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection_with_array_input(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="miller",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(
stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller", input='["one", "two", "three"]'
)
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller")

@mock_stepfunctions
def test_stepfunctions_send_start_execution_trace_injection_with_true_input(self):
sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566")
sf.create_state_machine(
name="hobart",
definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}',
roleArn="arn:aws:iam::012345678901:role/DummyRole",
)
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf)
sf.start_execution(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart", input="true")
# I've tried to find a way to make Moto show me the input to the execution, but can't get that to work.
spans = self.get_spans()
assert spans
span = spans[0]
assert span.name == "states.command" # This confirms our patch is working
sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart")

def _test_kinesis_client(self):
client = self.session.create_client("kinesis", region_name="us-east-1")
stream_name = "test"
Expand Down

0 comments on commit f7fa041

Please sign in to comment.