Skip to content

Commit

Permalink
botocore: Introduce instrumentation extensions (#718)
Browse files Browse the repository at this point in the history
* botocore: Introduce instrumentation extensions

* add extensions that are invoked before and after an AWS SDK
  service call to enrich the span with service specific request and
  response attirbutes
* move SQS specific parts to a separate extension

* changelog

Co-authored-by: Owais Lone <[email protected]>
Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2021
1 parent b41a917 commit c3df816
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
- `opentelemetry-instrumentation-botocore` Fix span injection for lambda invoke
([#663](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/663))
- `opentelemetry-instrumentation-botocore` Introduce instrumentation extensions
([#718](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/718))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ def response_hook(span, service_name, operation_name, result):

import json
import logging
from typing import Any, Collection, Dict, Optional, Tuple
from typing import Any, Callable, Collection, Dict, Optional, Tuple

from botocore.client import BaseClient
from botocore.endpoint import Endpoint
from botocore.exceptions import ClientError
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.instrumentation.botocore.extensions import _find_extension
from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
)
Expand Down Expand Up @@ -190,6 +191,10 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if call_context is None:
return original_func(*args, **kwargs)

extension = _find_extension(call_context)
if not extension.should_trace_service_call():
return original_func(*args, **kwargs)

attributes = {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: call_context.service_id,
Expand All @@ -198,6 +203,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
"aws.region": call_context.region,
}

_safe_invoke(extension.extract_attributes, attributes)

with self._tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
Expand All @@ -208,6 +215,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
BotocoreInstrumentor._patch_lambda_invoke(call_context.params)

_set_api_call_attributes(span, call_context)
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)

token = context_api.attach(
Expand All @@ -220,11 +228,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
except ClientError as error:
result = getattr(error, "response", None)
_apply_response_attributes(span, result)
_safe_invoke(extension.on_error, span, error)
raise
else:
_apply_response_attributes(span, result)
_safe_invoke(extension.on_success, span, result)
finally:
context_api.detach(token)
_safe_invoke(extension.after_service_call)

self._call_response_hook(span, call_context, result)

Expand Down Expand Up @@ -254,8 +265,6 @@ def _set_api_call_attributes(span, call_context: _AwsSdkCallContext):
if not span.is_recording():
return

if "QueueUrl" in call_context.params:
span.set_attribute("aws.queue_url", call_context.params["QueueUrl"])
if "TableName" in call_context.params:
span.set_attribute("aws.table_name", call_context.params["TableName"])

Expand Down Expand Up @@ -309,3 +318,14 @@ def _determine_call_context(
# extracting essential attributes ('service' and 'operation') failed.
logger.error("Error when initializing call context", exc_info=ex)
return None


def _safe_invoke(function: Callable, *args):
function_name = "<unknown>"
try:
function_name = function.__name__
function(*args)
except Exception as ex: # pylint:disable=broad-except
logger.error(
"Error when invoking function '%s'", function_name, exc_info=ex
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import importlib
import logging

from opentelemetry.instrumentation.botocore.extensions.types import (
_AwsSdkCallContext,
_AwsSdkExtension,
)

_logger = logging.getLogger(__name__)


def _lazy_load(module, cls):
def loader():
imported_mod = importlib.import_module(module, __name__)
return getattr(imported_mod, cls, None)

return loader


_KNOWN_EXTENSIONS = {
"sqs": _lazy_load(".sqs", "_SqsExtension"),
}


def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
try:
loader = _KNOWN_EXTENSIONS.get(call_context.service)
if loader is None:
return _AwsSdkExtension(call_context)

extension_cls = loader()
return extension_cls(call_context)
except Exception as ex: # pylint: disable=broad-except
_logger.error("Error when loading extension: %s", ex)
return _AwsSdkExtension(call_context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
)


class _SqsExtension(_AwsSdkExtension):
def extract_attributes(self, attributes: _AttributeMapT):
queue_url = self._call_context.params.get("QueueUrl")
if queue_url:
# TODO: update when semantic conventions exist
attributes["aws.queue_url"] = queue_url
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
from typing import Any, Dict, Optional, Tuple

from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import Span
from opentelemetry.util.types import AttributeValue

_logger = logging.getLogger(__name__)

_BotoClientT = "botocore.client.BaseClient"
_BotoResultT = Dict[str, Any]
_BotoClientErrorT = "botocore.exceptions.ClientError"

_OperationParamsT = Dict[str, Any]
_AttributeMapT = Dict[str, AttributeValue]


class _AwsSdkCallContext:
Expand Down Expand Up @@ -70,3 +75,49 @@ def _get_attr(obj, name: str, default=None):
except AttributeError:
_logger.warning("Could not get attribute '%s'", name)
return default


class _AwsSdkExtension:
def __init__(self, call_context: _AwsSdkCallContext):
self._call_context = call_context

def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use
"""Returns if the AWS SDK service call should be traced or not
Extensions might override this function to disable tracing for certain
operations.
"""
return True

def extract_attributes(self, attributes: _AttributeMapT):
"""Callback which gets invoked before the span is created.
Extensions might override this function to extract additional attributes.
"""

def before_service_call(self, span: Span):
"""Callback which gets invoked after the span is created but before the
AWS SDK service is called.
Extensions might override this function e.g. for injecting the span into
a carrier.
"""

def on_success(self, span: Span, result: _BotoResultT):
"""Callback that gets invoked when the AWS SDK call returns
successfully.
Extensions might override this function e.g. to extract and set response
attributes on the span.
"""

def on_error(self, span: Span, exception: _BotoClientErrorT):
"""Callback that gets invoked when the AWS SDK service call raises a
ClientError.
"""

def after_service_call(self):
"""Callback that gets invoked after the AWS SDK service was called.
Extensions might override this function to do some cleanup tasks.
"""

0 comments on commit c3df816

Please sign in to comment.