diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f76e9b1b3..75b03e4fd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664)) - `opentelemetry-instrumentation-botocore` Fix span injection for lambda invoke ([#663](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/663)) +- `opentelemetry-instrumentation-botocore` Introduce instrumentation extensions + ([#718](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/718)) ### Changed diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 438af1131c..e4be18466c 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -80,7 +80,7 @@ def response_hook(span, service_name, operation_name, result): import json import logging -from typing import Any, Collection, Dict, Optional, Tuple +from typing import Any, Callable, Collection, Dict, Optional, Tuple from botocore.client import BaseClient from botocore.endpoint import Endpoint @@ -88,6 +88,7 @@ def response_hook(span, service_name, operation_name, result): from wrapt import wrap_function_wrapper from opentelemetry import context as context_api +from opentelemetry.instrumentation.botocore.extensions import _find_extension from opentelemetry.instrumentation.botocore.extensions.types import ( _AwsSdkCallContext, ) @@ -190,6 +191,10 @@ def _patched_api_call(self, original_func, instance, args, kwargs): if call_context is None: return original_func(*args, **kwargs) + extension = _find_extension(call_context) + if not extension.should_trace_service_call(): + return original_func(*args, **kwargs) + attributes = { SpanAttributes.RPC_SYSTEM: "aws-api", SpanAttributes.RPC_SERVICE: call_context.service_id, @@ -198,6 +203,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs): "aws.region": call_context.region, } + _safe_invoke(extension.extract_attributes, attributes) + with self._tracer.start_as_current_span( call_context.span_name, kind=call_context.span_kind, @@ -208,6 +215,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs): BotocoreInstrumentor._patch_lambda_invoke(call_context.params) _set_api_call_attributes(span, call_context) + _safe_invoke(extension.before_service_call, span) self._call_request_hook(span, call_context) token = context_api.attach( @@ -220,11 +228,14 @@ def _patched_api_call(self, original_func, instance, args, kwargs): except ClientError as error: result = getattr(error, "response", None) _apply_response_attributes(span, result) + _safe_invoke(extension.on_error, span, error) raise else: _apply_response_attributes(span, result) + _safe_invoke(extension.on_success, span, result) finally: context_api.detach(token) + _safe_invoke(extension.after_service_call) self._call_response_hook(span, call_context, result) @@ -254,8 +265,6 @@ def _set_api_call_attributes(span, call_context: _AwsSdkCallContext): if not span.is_recording(): return - if "QueueUrl" in call_context.params: - span.set_attribute("aws.queue_url", call_context.params["QueueUrl"]) if "TableName" in call_context.params: span.set_attribute("aws.table_name", call_context.params["TableName"]) @@ -309,3 +318,14 @@ def _determine_call_context( # extracting essential attributes ('service' and 'operation') failed. logger.error("Error when initializing call context", exc_info=ex) return None + + +def _safe_invoke(function: Callable, *args): + function_name = "" + try: + function_name = function.__name__ + function(*args) + except Exception as ex: # pylint:disable=broad-except + logger.error( + "Error when invoking function '%s'", function_name, exc_info=ex + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index e69de29bb2..66ec526392 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -0,0 +1,35 @@ +import importlib +import logging + +from opentelemetry.instrumentation.botocore.extensions.types import ( + _AwsSdkCallContext, + _AwsSdkExtension, +) + +_logger = logging.getLogger(__name__) + + +def _lazy_load(module, cls): + def loader(): + imported_mod = importlib.import_module(module, __name__) + return getattr(imported_mod, cls, None) + + return loader + + +_KNOWN_EXTENSIONS = { + "sqs": _lazy_load(".sqs", "_SqsExtension"), +} + + +def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension: + try: + loader = _KNOWN_EXTENSIONS.get(call_context.service) + if loader is None: + return _AwsSdkExtension(call_context) + + extension_cls = loader() + return extension_cls(call_context) + except Exception as ex: # pylint: disable=broad-except + _logger.error("Error when loading extension: %s", ex) + return _AwsSdkExtension(call_context) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py new file mode 100644 index 0000000000..408ef458fe --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sqs.py @@ -0,0 +1,12 @@ +from opentelemetry.instrumentation.botocore.extensions.types import ( + _AttributeMapT, + _AwsSdkExtension, +) + + +class _SqsExtension(_AwsSdkExtension): + def extract_attributes(self, attributes: _AttributeMapT): + queue_url = self._call_context.params.get("QueueUrl") + if queue_url: + # TODO: update when semantic conventions exist + attributes["aws.queue_url"] = queue_url diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py index c4ab588352..360911f642 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py @@ -2,12 +2,17 @@ from typing import Any, Dict, Optional, Tuple from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span +from opentelemetry.util.types import AttributeValue _logger = logging.getLogger(__name__) _BotoClientT = "botocore.client.BaseClient" +_BotoResultT = Dict[str, Any] +_BotoClientErrorT = "botocore.exceptions.ClientError" _OperationParamsT = Dict[str, Any] +_AttributeMapT = Dict[str, AttributeValue] class _AwsSdkCallContext: @@ -70,3 +75,49 @@ def _get_attr(obj, name: str, default=None): except AttributeError: _logger.warning("Could not get attribute '%s'", name) return default + + +class _AwsSdkExtension: + def __init__(self, call_context: _AwsSdkCallContext): + self._call_context = call_context + + def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use + """Returns if the AWS SDK service call should be traced or not + + Extensions might override this function to disable tracing for certain + operations. + """ + return True + + def extract_attributes(self, attributes: _AttributeMapT): + """Callback which gets invoked before the span is created. + + Extensions might override this function to extract additional attributes. + """ + + def before_service_call(self, span: Span): + """Callback which gets invoked after the span is created but before the + AWS SDK service is called. + + Extensions might override this function e.g. for injecting the span into + a carrier. + """ + + def on_success(self, span: Span, result: _BotoResultT): + """Callback that gets invoked when the AWS SDK call returns + successfully. + + Extensions might override this function e.g. to extract and set response + attributes on the span. + """ + + def on_error(self, span: Span, exception: _BotoClientErrorT): + """Callback that gets invoked when the AWS SDK service call raises a + ClientError. + """ + + def after_service_call(self): + """Callback that gets invoked after the AWS SDK service was called. + + Extensions might override this function to do some cleanup tasks. + """