Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(botocore): refactor kinesis module #8937

Merged
merged 9 commits into from
Apr 15, 2024
19 changes: 15 additions & 4 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __getattribute__(self, name):

def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContext) -> Dict[str, str]:
span_kwargs = {}
for parameter_name in {"span_type", "resource", "service"}:
for parameter_name in {"span_type", "resource", "service", "child_of", "activate"}:
parameter_value = ctx.get_item(parameter_name, traverse=False)
if parameter_value:
span_kwargs[parameter_name] = parameter_value
Expand All @@ -97,6 +97,7 @@ def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContex

def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> Span:
span_kwargs = _get_parameters_for_new_span_directly_from_context(ctx)
call_trace = ctx.get_item("call_trace", call_trace)
tracer = (ctx.get_item("middleware") or ctx["pin"]).tracer
distributed_headers_config = ctx.get_item("distributed_headers_config")
if distributed_headers_config:
Expand Down Expand Up @@ -545,15 +546,22 @@ def _on_django_after_request_headers_post(

def _on_botocore_patched_api_call_started(ctx):
callback = ctx.get_item("context_started_callback")
span = ctx.get_item(ctx.get_item("call_key"))
callback(
ctx.get_item(ctx.get_item("call_key")),
span,
ctx.get_item("instance"),
ctx.get_item("args"),
ctx.get_item("params"),
ctx.get_item("endpoint_name"),
ctx.get_item("operation"),
)

# we need this since we may have ran the wrapped operation before starting the span
# we need to ensure the span start time is correct
start_ns = ctx.get_item("start_ns")
if start_ns is not None and ctx.get_item("func_run"):
span.start_ns = start_ns


def _on_botocore_patched_api_call_exception(ctx, response, exception_type, set_response_metadata_tags):
span = ctx.get_item("instrumented_api_call")
Expand All @@ -568,8 +576,7 @@ def _on_botocore_patched_api_call_exception(ctx, response, exception_type, set_r


def _on_botocore_patched_api_call_success(ctx, response, set_response_metadata_tags):
span = ctx.get_item("instrumented_api_call")
set_response_metadata_tags(span, response)
set_response_metadata_tags(ctx.get_item(ctx.get_item("call_key")), response)


def _on_botocore_trace_context_injection_prepared(
Expand Down Expand Up @@ -617,8 +624,11 @@ def listen():
core.on("django.after_request_headers.post", _on_django_after_request_headers_post)
core.on("botocore.patched_api_call.exception", _on_botocore_patched_api_call_exception)
core.on("botocore.patched_api_call.success", _on_botocore_patched_api_call_success)
core.on("botocore.patched_kinesis_api_call.success", _on_botocore_patched_api_call_success)
core.on("botocore.patched_kinesis_api_call.exception", _on_botocore_patched_api_call_exception)
core.on("botocore.prep_context_injection.post", _on_botocore_trace_context_injection_prepared)
core.on("botocore.patched_api_call.started", _on_botocore_patched_api_call_started)
core.on("botocore.patched_kinesis_api_call.started", _on_botocore_patched_api_call_started)

for context_name in (
"flask.call",
Expand All @@ -632,6 +642,7 @@ def listen():
"django.func.wrapped",
"botocore.instrumented_api_call",
"botocore.instrumented_lib_function",
"botocore.patched_kinesis_api_call",
):
core.on(f"context.started.start_span.{context_name}", _start_span)

Expand Down
55 changes: 28 additions & 27 deletions ddtrace/contrib/botocore/services/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ddtrace.internal.schema.span_attribute_schema import SpanDirection

from ....ext import SpanTypes
from ....ext import http
from ....internal.compat import time_ns
from ....internal.logger import get_logger
from ....internal.schema import schematize_cloud_messaging_operation
Expand Down Expand Up @@ -149,29 +148,35 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
- func_run_err: There was an error when calling the `getRecords` function
"""
if (func_run and message_received) or config.botocore.empty_poll_enabled or not func_run or func_run_err:
with pin.tracer.start_span(
trace_operation,
with core.context_with_data(
"botocore.patched_kinesis_api_call",
instance=instance,
args=args,
params=params,
endpoint_name=endpoint_name,
operation=operation,
service=schematize_service_name("{}.{}".format(pin.service, endpoint_name)),
call_trace=False,
context_started_callback=set_patched_api_call_span_tags,
pin=pin,
span_name=trace_operation,
span_type=SpanTypes.HTTP,
child_of=child_of if child_of is not None else pin.tracer.context_provider.active(),
activate=True,
) as span:
set_patched_api_call_span_tags(span, instance, args, params, endpoint_name, operation)
func_run=func_run,
start_ns=start_ns,
call_key="patched_kinesis_api_call",
) as ctx, ctx.get_item(ctx.get_item("call_key")) as span:
core.dispatch("botocore.patched_kinesis_api_call.started", [ctx])

# we need this since we may have ran the wrapped operation before starting the span
# we need to ensure the span start time is correct
if start_ns is not None and func_run:
span.start_ns = start_ns

if config.botocore["distributed_tracing"]:
try_inject_DD_context(
endpoint_name, operation, params, span, trace_operation, inject_trace_context=True
)

# we still want to inject DSM context (if DSM is enabled) even if distributed tracing is disabled
elif not config.botocore["distributed_tracing"] and config._data_streams_enabled:
if config.botocore["distributed_tracing"] or config._data_streams_enabled:
try_inject_DD_context(
endpoint_name, operation, params, span, trace_operation, inject_trace_context=False
endpoint_name,
operation,
params,
span,
trace_operation,
inject_trace_context=bool(config.botocore["distributed_tracing"]),
)

try:
Expand All @@ -184,18 +189,14 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
if func_run_err:
raise func_run_err

set_response_metadata_tags(span, result)
core.dispatch("botocore.patched_kinesis_api_call.success", [ctx, result, set_response_metadata_tags])
return result

except botocore.exceptions.ClientError as e:
# `ClientError.response` contains the result, so we can still grab response metadata
set_response_metadata_tags(span, e.response)

# If we have a status code, and the status code is not an error,
# then ignore the exception being raised
status_code = span.get_tag(http.STATUS_CODE)
if status_code and not config.botocore.operations[span.resource].is_error_code(int(status_code)):
span._ignore_exception(botocore.exceptions.ClientError)
core.dispatch(
"botocore.patched_kinesis_api_call.exception",
[ctx, e.response, botocore.exceptions.ClientError, set_response_metadata_tags],
)
raise
# return results in the case that we ran the function, but no records were returned and empty
# poll spans are disabled
Expand Down
3 changes: 0 additions & 3 deletions ddtrace/contrib/botocore/services/stepfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ def inject_trace_to_stepfunction_input(params, span):
if isinstance(input_obj, dict):
input_obj["_datadog"] = {}
HTTPPropagator.inject(span.context, input_obj["_datadog"])
input_json = json.dumps(input_obj)

params["input"] = input_json
return
else:
log.warning("Unable to inject context. The StepFunction input was not a dict.")
Expand Down
Loading