From d52f42fb7f36c5448395f4cf842523ee9f3e01c9 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Thu, 26 Sep 2024 18:00:31 +0500 Subject: [PATCH] feat(opentelemetry-instrumentation-aiokafka): wrap getone instead of anext, add tests (#2874) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add tests * add to CHANGELOG * add tests for baggage * wrap getone instead of __anext__ * split sync and async tests (fix review) * add dimastbk to component_owners.yml for aiokafka * Update CHANGELOG.md --------- Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com> Co-authored-by: Riccardo Magliocchetti --- .github/component_owners.yml | 3 + CHANGELOG.md | 6 + .../instrumentation/aiokafka/__init__.py | 8 +- .../instrumentation/aiokafka/utils.py | 2 +- .../tests/test_instrumentation.py | 290 +++++++++++++++++- .../tests/test_utils.py | 4 +- tox.ini | 1 + 7 files changed, 302 insertions(+), 12 deletions(-) diff --git a/.github/component_owners.yml b/.github/component_owners.yml index e3ca06b450..dcc1013476 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -61,5 +61,8 @@ components: instrumentation/opentelemetry-instrumentation-psycopg: - federicobond + instrumentation/opentelemetry-instrumentation-aiokafka: + - dimastbk + processor/opentelemetry-processor-baggage: - codeboten diff --git a/CHANGELOG.md b/CHANGELOG.md index a060517cdf..30440c2dce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082)) + +### Fixed + +- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__` + ([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) + ## Version 1.27.0/0.48b0 () ### Added diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 5b2b0cd0e8..7d994be622 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -76,7 +76,7 @@ async def async_consume_hook(span, record, args, kwargs): from opentelemetry import trace from opentelemetry.instrumentation.aiokafka.package import _instruments from opentelemetry.instrumentation.aiokafka.utils import ( - _wrap_anext, + _wrap_getone, _wrap_send, ) from opentelemetry.instrumentation.aiokafka.version import __version__ @@ -126,10 +126,10 @@ def _instrument(self, **kwargs): ) wrap_function_wrapper( aiokafka.AIOKafkaConsumer, - "__anext__", - _wrap_anext(tracer, async_consume_hook), + "getone", + _wrap_getone(tracer, async_consume_hook), ) def _uninstrument(self, **kwargs): unwrap(aiokafka.AIOKafkaProducer, "send") - unwrap(aiokafka.AIOKafkaConsumer, "__anext__") + unwrap(aiokafka.AIOKafkaConsumer, "getone") diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 3c54ce1500..cae0d97717 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -335,7 +335,7 @@ async def _create_consumer_span( context.detach(token) -def _wrap_anext( +def _wrap_getone( tracer: Tracer, async_consume_hook: ConsumeHookT ) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]: async def _traced_next( diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 1c4e5e3d10..8211566239 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -11,15 +11,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from unittest import TestCase -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +import uuid +from typing import Any, List, Sequence, Tuple +from unittest import IsolatedAsyncioTestCase, TestCase, mock + +from aiokafka import ( + AIOKafkaConsumer, + AIOKafkaProducer, + ConsumerRecord, + TopicPartition, +) from wrapt import BoundFunctionWrapper +from opentelemetry import baggage, context from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.semconv._incubating.attributes import messaging_attributes +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind, format_trace_id, set_span_in_context -class TestAIOKafka(TestCase): +class TestAIOKafkaInstrumentor(TestCase): def test_instrument_api(self) -> None: instrumentation = AIOKafkaInstrumentor() @@ -28,7 +42,7 @@ def test_instrument_api(self) -> None: isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) ) self.assertTrue( - isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper) + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) ) instrumentation.uninstrument() @@ -36,5 +50,271 @@ def test_instrument_api(self) -> None: isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) ) self.assertFalse( - isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper) + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) + ) + + +class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase): + @staticmethod + def consumer_record_factory( + number: int, headers: Tuple[Tuple[str, bytes], ...] + ) -> ConsumerRecord: + return ConsumerRecord( + f"topic_{number}", + number, + number, + number, + number, + f"key_{number}".encode(), + f"value_{number}".encode(), + None, + number, + number, + headers=headers, + ) + + @staticmethod + async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer: + consumer = AIOKafkaConsumer(**consumer_kwargs) + + consumer._client.bootstrap = mock.AsyncMock() + consumer._client._wait_on_metadata = mock.AsyncMock() + + await consumer.start() + + consumer._fetcher.next_record = mock.AsyncMock() + + return consumer + + @staticmethod + async def producer_factory() -> AIOKafkaProducer: + producer = AIOKafkaProducer(api_version="1.0") + + producer.client._wait_on_metadata = mock.AsyncMock() + producer.client.bootstrap = mock.AsyncMock() + producer._message_accumulator.add_message = mock.AsyncMock() + producer._sender.start = mock.AsyncMock() + producer._partition = mock.Mock(return_value=1) + + await producer.start() + + return producer + + async def test_getone(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + client_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + consumer = await self.consumer_factory( + client_id=client_id, group_id=group_id + ) + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + + expected_spans = [ + { + "name": "topic_1 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1", + }, + }, + { + "name": "topic_2 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2", + }, + }, + ] + self.memory_exporter.clear() + + next_record_mock.side_effect = [ + self.consumer_record_factory( + 1, + headers=( + ( + "traceparent", + b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01", + ), + ), + ), + self.consumer_record_factory(2, headers=()), + ] + + await consumer.getone() + next_record_mock.assert_awaited_with(()) + + first_span = self.memory_exporter.get_finished_spans()[0] + self.assertEqual( + format_trace_id(first_span.get_span_context().trace_id), + "03afa25236b8cd948fa853d67038ac79", + ) + + await consumer.getone() + next_record_mock.assert_awaited_with(()) + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + async def test_getone_baggage(self) -> None: + received_baggage = None + + async def async_consume_hook(span, *_) -> None: + nonlocal received_baggage + received_baggage = baggage.get_all(set_span_in_context(span)) + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_consume_hook=async_consume_hook, + ) + + consumer = await self.consumer_factory() + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + + self.memory_exporter.clear() + + next_record_mock.side_effect = [ + self.consumer_record_factory( + 1, + headers=( + ( + "traceparent", + b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01", + ), + ("baggage", b"foo=bar"), + ), + ), + ] + + await consumer.getone() + next_record_mock.assert_awaited_with(()) + + self.assertEqual(received_baggage, {"foo": "bar"}) + + async def test_getone_consume_hook(self) -> None: + async_consume_hook_mock = mock.AsyncMock() + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_consume_hook=async_consume_hook_mock, + ) + + consumer = await self.consumer_factory() + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + + next_record_mock.side_effect = [ + self.consumer_record_factory(1, headers=()) + ] + + await consumer.getone() + + async_consume_hook_mock.assert_awaited_once() + + async def test_send(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + producer = await self.producer_factory() + add_message_mock: mock.AsyncMock = ( + producer._message_accumulator.add_message + ) + + tracer = self.tracer_provider.get_tracer(__name__) + with tracer.start_as_current_span("test_span") as span: + await producer.send("topic_1", b"value_1") + + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_1", partition=1), + None, + b"value_1", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], ) + add_message_mock.call_args_list[0].kwargs["headers"][0][1].startswith( + f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode() + ) + + await producer.send("topic_2", b"value_2") + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_2", partition=1), + None, + b"value_2", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], + ) + + async def test_send_baggage(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + producer = await self.producer_factory() + add_message_mock: mock.AsyncMock = ( + producer._message_accumulator.add_message + ) + + tracer = self.tracer_provider.get_tracer(__name__) + ctx = baggage.set_baggage("foo", "bar") + context.attach(ctx) + + with tracer.start_as_current_span("test_span", context=ctx): + await producer.send("topic_1", b"value_1") + + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_1", partition=1), + None, + b"value_1", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY), ("baggage", b"foo=bar")], + ) + + async def test_send_produce_hook(self) -> None: + async_produce_hook_mock = mock.AsyncMock() + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_produce_hook=async_produce_hook_mock, + ) + + producer = await self.producer_factory() + + await producer.send("topic_1", b"value_1") + + async_produce_hook_mock.assert_awaited_once() + + def _compare_spans( + self, spans: Sequence[ReadableSpan], expected_spans: List[dict] + ) -> None: + self.assertEqual(len(spans), len(expected_spans)) + for span, expected_span in zip(spans, expected_spans): + self.assertEqual(expected_span["name"], span.name) + self.assertEqual(expected_span["kind"], span.kind) + self.assertEqual( + expected_span["attributes"], dict(span.attributes) + ) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index b1b2792608..09a8655309 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -23,7 +23,7 @@ _create_consumer_span, _extract_send_partition, _get_span_name, - _wrap_anext, + _wrap_getone, _wrap_send, ) from opentelemetry.trace import SpanKind @@ -187,7 +187,7 @@ async def test_wrap_next( original_next_callback = mock.AsyncMock() kafka_consumer = mock.MagicMock() - wrapped_next = _wrap_anext(tracer, consume_hook) + wrapped_next = _wrap_getone(tracer, consume_hook) record = await wrapped_next( original_next_callback, kafka_consumer, self.args, self.kwargs ) diff --git a/tox.ini b/tox.ini index e0a5a71131..2b0995d8fc 100644 --- a/tox.ini +++ b/tox.ini @@ -444,6 +444,7 @@ commands_pre = aiokafka: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api aiokafka: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions aiokafka: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk + aiokafka: pip install opentelemetry-test-utils@{env:CORE_REPO}\#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils aiokafka: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/test-requirements.txt kafka-python: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api