diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 5e8469ba26e..8f4c6b68b08 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -35,6 +35,8 @@ from .services.sqs import inject_trace_to_sqs_or_sns_batch_message from .services.sqs import inject_trace_to_sqs_or_sns_message from .services.sqs import patched_sqs_api_call +from .services.stepfunctions import inject_trace_to_stepfunction_input +from .services.stepfunctions import patched_stepfunction_api_call from .utils import inject_trace_to_client_context from .utils import inject_trace_to_eventbridge_detail from .utils import set_patched_api_call_span_tags @@ -150,6 +152,10 @@ def patched_api_call(original_func, instance, args, kwargs): kwargs=kwargs, function_vars=function_vars, ) + elif endpoint_name == "states": + return patched_stepfunction_api_call( + original_func=original_func, instance=instance, args=args, kwargs=kwargs, function_vars=function_vars + ) else: # this is the default patched api call return patched_api_call_fallback( @@ -220,6 +226,16 @@ def patched_api_call_fallback(original_func, instance, args, kwargs, function_va cloud_service="sns", direction=SpanDirection.OUTBOUND, ) + if endpoint_name == "states" and ( + operation == "StartExecution" or operation == "StartSyncExecution" + ): + inject_trace_to_stepfunction_input(params, span) + span.name = schematize_cloud_messaging_operation( + trace_operation, + cloud_provider="aws", + cloud_service="stepfunctions", + direction=SpanDirection.OUTBOUND, + ) except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/ddtrace/contrib/botocore/services/stepfunctions.py b/ddtrace/contrib/botocore/services/stepfunctions.py new file mode 100644 index 00000000000..7aa32a514e3 --- /dev/null +++ b/ddtrace/contrib/botocore/services/stepfunctions.py @@ -0,0 +1,108 @@ +import json +from typing import Any # noqa:F401 +from typing import Dict # noqa:F401 + +import botocore.exceptions + +from ddtrace import Span # noqa:F401 +from ddtrace import config +from ddtrace.ext import http +from ddtrace.propagation.http import HTTPPropagator + +from ....ext import SpanTypes +from ....internal.logger import get_logger +from ....internal.schema import SpanDirection +from ....internal.schema import schematize_cloud_messaging_operation +from ....internal.schema import schematize_service_name +from ..utils import set_patched_api_call_span_tags +from ..utils import set_response_metadata_tags + + +log = get_logger(__name__) + + +def inject_trace_to_stepfunction_input(params, span): + # type: (Any, Span) -> None + """ + :params: contains the params for the current botocore action + :span: the span which provides the trace context to be propagated + + Inject the trace headers into the StepFunction input if the input is a JSON string + """ + if "input" not in params: + log.warning("Unable to inject context. The StepFunction input had no input.") + return + + if params["input"] is None: + log.warning("Unable to inject context. The StepFunction input was None.") + return + + elif isinstance(params["input"], dict): + if "_datadog" in params["input"]: + log.warning("Input already has trace context.") + return + params["input"]["_datadog"] = {} + HTTPPropagator.inject(span.context, params["input"]["_datadog"]) + return + + elif isinstance(params["input"], str): + try: + input_obj = json.loads(params["input"]) + except ValueError: + log.warning("Input is not a valid JSON string") + return + + if isinstance(input_obj, dict): + input_obj["_datadog"] = {} + HTTPPropagator.inject(span.context, input_obj["_datadog"]) + input_json = json.dumps(input_obj) + + params["input"] = input_json + return + else: + log.warning("Unable to inject context. The StepFunction input was not a dict.") + return + + else: + log.warning("Unable to inject context. The StepFunction input was not a dict or a JSON string.") + + +def patched_stepfunction_api_call(original_func, instance, args, kwargs: Dict, function_vars: Dict): + params = function_vars.get("params") + trace_operation = function_vars.get("trace_operation") + pin = function_vars.get("pin") + endpoint_name = function_vars.get("endpoint_name") + operation = function_vars.get("operation") + + with pin.tracer.trace( + trace_operation, + service=schematize_service_name("{}.{}".format(pin.service, endpoint_name)), + span_type=SpanTypes.HTTP, + ) as span: + set_patched_api_call_span_tags(span, instance, args, params, endpoint_name, operation) + + if args: + if config.botocore["distributed_tracing"]: + try: + if endpoint_name == "states" and operation in {"StartExecution", "StartSyncExecution"}: + inject_trace_to_stepfunction_input(params, span) + span.name = schematize_cloud_messaging_operation( + trace_operation, + cloud_provider="aws", + cloud_service="stepfunctions", + direction=SpanDirection.OUTBOUND, + ) + except Exception: + log.warning("Unable to inject trace context", exc_info=True) + + try: + return original_func(*args, **kwargs) + except botocore.exceptions.ClientError as e: + set_response_metadata_tags(span, e.response) + + # If we have a status code, and the status code is not an error, + # then ignore the exception being raised + status_code = span.get_tag(http.STATUS_CODE) + if status_code and not config.botocore.operations[span.resource].is_error_code(int(status_code)): + span._ignore_exception(botocore.exceptions.ClientError) + raise diff --git a/releasenotes/notes/inject-botocore-stepfunction-start_execution-calls-95bed0ca2e1d006e.yaml b/releasenotes/notes/inject-botocore-stepfunction-start_execution-calls-95bed0ca2e1d006e.yaml new file mode 100644 index 00000000000..c1c226113a0 --- /dev/null +++ b/releasenotes/notes/inject-botocore-stepfunction-start_execution-calls-95bed0ca2e1d006e.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + botocore: Add the ability to inject trace context into the input field of botocore stepfunction start_execution and + start_sync_execution calls. diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 6232dba3e85..0724da3be26 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -18,6 +18,7 @@ from moto import mock_s3 from moto import mock_sns from moto import mock_sqs +from moto import mock_stepfunctions import pytest from tests.utils import get_128_bit_trace_id_from_headers @@ -885,6 +886,61 @@ def test_schematized_unspecified_service_sqs_client_v1(self): assert spans[2].service == DEFAULT_SPAN_SERVICE_NAME assert spans[2].name == "aws.sqs.receive" + @mock_stepfunctions + def test_stepfunctions_send_start_execution_trace_injection(self): + sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566") + sf.create_state_machine( + name="lincoln", + definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}', + roleArn="arn:aws:iam::012345678901:role/DummyRole", + ) + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf) + sf.start_execution( + stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln", input='{"baz":1}' + ) + # I've tried to find a way to make Moto show me the input to the execution, but can't get that to work. + spans = self.get_spans() + assert spans + span = spans[0] + assert span.name == "states.command" # This confirms our patch is working + sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:lincoln") + + @mock_stepfunctions + def test_stepfunctions_send_start_execution_trace_injection_with_array_input(self): + sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566") + sf.create_state_machine( + name="miller", + definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}', + roleArn="arn:aws:iam::012345678901:role/DummyRole", + ) + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf) + sf.start_execution( + stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller", input='["one", "two", "three"]' + ) + # I've tried to find a way to make Moto show me the input to the execution, but can't get that to work. + spans = self.get_spans() + assert spans + span = spans[0] + assert span.name == "states.command" # This confirms our patch is working + sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:miller") + + @mock_stepfunctions + def test_stepfunctions_send_start_execution_trace_injection_with_true_input(self): + sf = self.session.create_client("stepfunctions", region_name="us-west-2", endpoint_url="http://localhost:4566") + sf.create_state_machine( + name="hobart", + definition='{"StartAt": "HelloWorld","States": {"HelloWorld": {"Type": "Pass","End": true}}}', + roleArn="arn:aws:iam::012345678901:role/DummyRole", + ) + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sf) + sf.start_execution(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart", input="true") + # I've tried to find a way to make Moto show me the input to the execution, but can't get that to work. + spans = self.get_spans() + assert spans + span = spans[0] + assert span.name == "states.command" # This confirms our patch is working + sf.delete_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:hobart") + def _test_kinesis_client(self): client = self.session.create_client("kinesis", region_name="us-east-1") stream_name = "test"