From ee1bc72b6ac18cc2a4de101078d196891bb4a6f5 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Sat, 18 Dec 2021 18:35:03 +0200 Subject: [PATCH 1/6] feat: support older pika versions --- instrumentation/README.md | 2 +- .../instrumentation/pika/package.py | 2 +- .../instrumentation/pika/pika_instrumentor.py | 31 ++++++++++++++++--- .../tests/test_pika_instrumentation.py | 8 +++-- .../instrumentation/bootstrap_gen.py | 2 +- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/instrumentation/README.md b/instrumentation/README.md index fb8a9638b5..9349836203 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -20,7 +20,7 @@ | [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 | | [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging | | [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 | -| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 | +| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 0.12.0 | | [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 | | [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 | | [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 | diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py index 27ceebbac7..52c0863b58 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("pika >= 1.1.0",) +_instruments: Collection[str] = ("pika >= 0.12.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 072ee56cfd..879ad0609d 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -14,7 +14,9 @@ from logging import getLogger from typing import Any, Collection, Dict, Optional +import pkg_resources import wrapt +from packaging import version from pika.adapters import BlockingConnection from pika.adapters.blocking_connection import BlockingChannel @@ -41,8 +43,12 @@ def _instrument_blocking_channel_consumers( consume_hook: utils.HookT = utils.dummy_callback, ) -> Any: for consumer_tag, consumer_info in channel._consumer_infos.items(): + callback_attr = ( + PikaInstrumentor._consumer_callback_attribute_name() + ) + consumer_callback = getattr(consumer_info, callback_attr) decorated_callback = utils._decorate_callback( - consumer_info.on_message_callback, + consumer_callback, tracer, consumer_tag, consume_hook, @@ -51,9 +57,9 @@ def _instrument_blocking_channel_consumers( setattr( decorated_callback, "_original_callback", - consumer_info.on_message_callback, + consumer_callback, ) - consumer_info.on_message_callback = decorated_callback + setattr(consumer_info, callback_attr, decorated_callback) @staticmethod def _instrument_basic_publish( @@ -126,12 +132,27 @@ def uninstrument_channel(channel: BlockingChannel) -> None: return for consumers_tag, client_info in channel._consumer_infos.items(): - if hasattr(client_info.on_message_callback, "_original_callback"): + callback_attr = ( + PikaInstrumentor._consumer_callback_attribute_name() + ) + consumer_callback = getattr(client_info, callback_attr) + if hasattr(consumer_callback, "_original_callback"): channel._consumer_infos[ consumers_tag - ] = client_info.on_message_callback._original_callback + ] = consumer_callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) + @staticmethod + def _consumer_callback_attribute_name() -> str: + pika_version = version.parse( + pkg_resources.get_distribution("pika").version + ) + return ( + "on_message_callback" + if pika_version >= version.parse("1.0.0") + else "consumer_cb" + ) + def _decorate_channel_function( self, tracer_provider: Optional[TracerProvider], diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 410ccf069f..3e949c43ab 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -26,7 +26,8 @@ class TestPika(TestCase): def setUp(self) -> None: self.channel = mock.MagicMock(spec=Channel) consumer_info = mock.MagicMock() - consumer_info.on_message_callback = mock.MagicMock() + callback_attr = PikaInstrumentor._consumer_callback_attribute_name() + setattr(consumer_info, callback_attr, mock.MagicMock()) self.channel._consumer_infos = {"consumer-tag": consumer_info} self.mock_callback = mock.MagicMock() @@ -72,8 +73,11 @@ def test_instrument_consumers( self, decorate_callback: mock.MagicMock ) -> None: tracer = mock.MagicMock(spec=Tracer) + callback_attr = PikaInstrumentor._consumer_callback_attribute_name() expected_decoration_calls = [ - mock.call(value.on_message_callback, tracer, key, dummy_callback) + mock.call( + getattr(value, callback_attr), tracer, key, dummy_callback + ) for key, value in self.channel._consumer_infos.items() ] PikaInstrumentor._instrument_blocking_channel_consumers( diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 8e551cd106..9c74505d8d 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -81,7 +81,7 @@ "instrumentation": "opentelemetry-instrumentation-mysql==0.27b0", }, "pika": { - "library": "pika >= 1.1.0", + "library": "pika >= 0.12.0", "instrumentation": "opentelemetry-instrumentation-pika==0.27b0", }, "psycopg2": { From a82ebb1ceacfcd70c666f39ff211bbbcab7448e6 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Sun, 19 Dec 2021 21:28:26 +0200 Subject: [PATCH 2/6] update tox.ini --- tox.ini | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tox.ini b/tox.ini index 97673d1a47..0f2ca1e949 100644 --- a/tox.ini +++ b/tox.ini @@ -182,8 +182,8 @@ envlist = pypy3-test-propagator-ot-trace ; opentelemetry-instrumentation-pika - py3{6,7,8,9,10}-test-instrumentation-pika - pypy3-test-instrumentation-pika + py3{6,7,8,9,10}-test-instrumentation-pika{0,1} + pypy3-test-instrumentation-pika{0,1} lint docker-tests @@ -216,6 +216,8 @@ deps = sqlalchemy11: sqlalchemy>=1.1,<1.2 sqlalchemy14: aiosqlite sqlalchemy14: sqlalchemy~=1.4 + pika0: pika>=0.12.0,<1.0.0 + pika1: pika>=1.0.0 ; FIXME: add coverage testing ; FIXME: add mypy testing @@ -249,7 +251,7 @@ changedir = test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests - test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests + test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -286,7 +288,7 @@ commands_pre = celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] - pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test] From 773c289f5c4f70a5780297dced9dc588d08029c0 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Sun, 19 Dec 2021 22:15:52 +0200 Subject: [PATCH 3/6] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34355907c2..b55882845a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-aws-lambda` Adds support for configurable flush timeout via `OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT` property. ([#825](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/825)) +- `opentelemetry-instrumentation-pika` Adds support for versions between `0.12.0` to `1.0.0`. ([#837](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/837)) ### Fixed From fdba606e196cc1f07c00ba84896fc9e49e3f5ae0 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Mon, 20 Dec 2021 17:51:47 +0200 Subject: [PATCH 4/6] take version from pika --- .../instrumentation/pika/pika_instrumentor.py | 32 ++++++++----------- .../tests/test_pika_instrumentation.py | 4 +-- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 879ad0609d..cd1be944f8 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -14,7 +14,7 @@ from logging import getLogger from typing import Any, Collection, Dict, Optional -import pkg_resources +import pika import wrapt from packaging import version from pika.adapters import BlockingConnection @@ -34,7 +34,18 @@ _FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] +def _consumer_callback_attribute_name() -> str: + pika_version = version.parse(pika.__version__) + return ( + "on_message_callback" + if pika_version >= version.parse("1.0.0") + else "consumer_cb" + ) + + class PikaInstrumentor(BaseInstrumentor): # type: ignore + CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name() + # pylint: disable=attribute-defined-outside-init @staticmethod def _instrument_blocking_channel_consumers( @@ -43,9 +54,7 @@ def _instrument_blocking_channel_consumers( consume_hook: utils.HookT = utils.dummy_callback, ) -> Any: for consumer_tag, consumer_info in channel._consumer_infos.items(): - callback_attr = ( - PikaInstrumentor._consumer_callback_attribute_name() - ) + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR consumer_callback = getattr(consumer_info, callback_attr) decorated_callback = utils._decorate_callback( consumer_callback, @@ -132,9 +141,7 @@ def uninstrument_channel(channel: BlockingChannel) -> None: return for consumers_tag, client_info in channel._consumer_infos.items(): - callback_attr = ( - PikaInstrumentor._consumer_callback_attribute_name() - ) + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR consumer_callback = getattr(client_info, callback_attr) if hasattr(consumer_callback, "_original_callback"): channel._consumer_infos[ @@ -142,17 +149,6 @@ def uninstrument_channel(channel: BlockingChannel) -> None: ] = consumer_callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) - @staticmethod - def _consumer_callback_attribute_name() -> str: - pika_version = version.parse( - pkg_resources.get_distribution("pika").version - ) - return ( - "on_message_callback" - if pika_version >= version.parse("1.0.0") - else "consumer_cb" - ) - def _decorate_channel_function( self, tracer_provider: Optional[TracerProvider], diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 3e949c43ab..3c176f21d0 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -26,7 +26,7 @@ class TestPika(TestCase): def setUp(self) -> None: self.channel = mock.MagicMock(spec=Channel) consumer_info = mock.MagicMock() - callback_attr = PikaInstrumentor._consumer_callback_attribute_name() + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR setattr(consumer_info, callback_attr, mock.MagicMock()) self.channel._consumer_infos = {"consumer-tag": consumer_info} self.mock_callback = mock.MagicMock() @@ -73,7 +73,7 @@ def test_instrument_consumers( self, decorate_callback: mock.MagicMock ) -> None: tracer = mock.MagicMock(spec=Tracer) - callback_attr = PikaInstrumentor._consumer_callback_attribute_name() + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR expected_decoration_calls = [ mock.call( getattr(value, callback_attr), tracer, key, dummy_callback From 68f978330d15fa9947afa6cd7147ae0831be2c58 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Mon, 20 Dec 2021 18:38:59 +0200 Subject: [PATCH 5/6] avoid exception when property name changes --- .../opentelemetry/instrumentation/pika/pika_instrumentor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index cd1be944f8..b09c3a0f9c 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -55,7 +55,9 @@ def _instrument_blocking_channel_consumers( ) -> Any: for consumer_tag, consumer_info in channel._consumer_infos.items(): callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR - consumer_callback = getattr(consumer_info, callback_attr) + consumer_callback = getattr(consumer_info, callback_attr, None) + if consumer_callback is None: + continue decorated_callback = utils._decorate_callback( consumer_callback, tracer, @@ -142,7 +144,7 @@ def uninstrument_channel(channel: BlockingChannel) -> None: for consumers_tag, client_info in channel._consumer_infos.items(): callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR - consumer_callback = getattr(client_info, callback_attr) + consumer_callback = getattr(client_info, callback_attr, None) if hasattr(consumer_callback, "_original_callback"): channel._consumer_infos[ consumers_tag From e85a2c82110ed2e794eb0289c338fba74c3d11c3 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Mon, 20 Dec 2021 19:48:50 +0200 Subject: [PATCH 6/6] add callback attr name test --- .../tests/test_pika_instrumentation.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 3c176f21d0..6e154c04f9 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -18,6 +18,9 @@ from wrapt import BoundFunctionWrapper from opentelemetry.instrumentation.pika import PikaInstrumentor +from opentelemetry.instrumentation.pika.pika_instrumentor import ( + _consumer_callback_attribute_name, +) from opentelemetry.instrumentation.pika.utils import dummy_callback from opentelemetry.trace import Tracer @@ -113,3 +116,13 @@ def test_uninstrument_channel_functions(self) -> None: self.channel.basic_publish._original_function = original_function PikaInstrumentor._uninstrument_channel_functions(self.channel) self.assertEqual(self.channel.basic_publish, original_function) + + def test_consumer_callback_attribute_name(self) -> None: + with mock.patch("pika.__version__", "1.0.0"): + self.assertEqual( + _consumer_callback_attribute_name(), "on_message_callback" + ) + with mock.patch("pika.__version__", "0.12.0"): + self.assertEqual( + _consumer_callback_attribute_name(), "consumer_cb" + )