From 8f04553a53fc0e7bf64e6f8cf56d4e83cc368119 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 17 Sep 2024 01:35:34 +0500 Subject: [PATCH 1/7] add tests --- .../tests/test_instrumentation.py | 209 +++++++++++++++++- tox.ini | 1 + 2 files changed, 207 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 1c4e5e3d10..43d1eac508 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -11,15 +11,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from unittest import TestCase -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +import uuid +from typing import List, Tuple +from unittest import IsolatedAsyncioTestCase, mock + +from aiokafka import ( + AIOKafkaConsumer, + AIOKafkaProducer, + ConsumerRecord, + TopicPartition, +) from wrapt import BoundFunctionWrapper from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor +from opentelemetry.sdk.trace import Span +from opentelemetry.semconv._incubating.attributes import messaging_attributes +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind, format_trace_id + +class TestAIOKafka(TestBase, IsolatedAsyncioTestCase): + @staticmethod + def consumer_record_factory( + number: int, headers: Tuple[Tuple[str, bytes], ...] + ) -> ConsumerRecord: + return ConsumerRecord( + f"topic_{number}", + number, + number, + number, + number, + f"key_{number}".encode(), + f"value_{number}".encode(), + None, + number, + number, + headers=headers, + ) -class TestAIOKafka(TestCase): def test_instrument_api(self) -> None: instrumentation = AIOKafkaInstrumentor() @@ -38,3 +69,175 @@ def test_instrument_api(self) -> None: self.assertFalse( isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper) ) + + async def test_anext(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + client_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + consumer = AIOKafkaConsumer(client_id=client_id, group_id=group_id) + + expected_spans = [ + { + "name": "topic_1 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1", + }, + }, + { + "name": "topic_2 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2", + }, + }, + ] + self.memory_exporter.clear() + + getone_mock = mock.AsyncMock() + consumer.getone = getone_mock + + getone_mock.side_effect = [ + self.consumer_record_factory( + 1, + headers=( + ( + "traceparent", + b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01", + ), + ), + ), + self.consumer_record_factory(2, headers=()), + ] + + await consumer.__anext__() + getone_mock.assert_awaited_with() + + first_span = self.memory_exporter.get_finished_spans()[0] + self.assertEqual( + format_trace_id(first_span.get_span_context().trace_id), + "03afa25236b8cd948fa853d67038ac79", + ) + + await consumer.__anext__() + getone_mock.assert_awaited_with() + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + async def test_anext_consumer_hook(self) -> None: + async_consume_hook_mock = mock.AsyncMock() + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_consume_hook=async_consume_hook_mock, + ) + + consumer = AIOKafkaConsumer() + + getone_mock = mock.AsyncMock() + consumer.getone = getone_mock + + getone_mock.side_effect = [self.consumer_record_factory(1, headers=())] + + await consumer.__anext__() + + async_consume_hook_mock.assert_awaited_once() + + async def test_send(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + producer = AIOKafkaProducer(api_version="1.0") + + add_message_mock = mock.AsyncMock() + producer.client._wait_on_metadata = mock.AsyncMock() + producer.client.bootstrap = mock.AsyncMock() + producer._message_accumulator.add_message = add_message_mock + producer._sender.start = mock.AsyncMock() + producer._partition = mock.Mock(return_value=1) + + await producer.start() + + tracer = self.tracer_provider.get_tracer(__name__) + with tracer.start_as_current_span("test_span") as span: + await producer.send("topic_1", b"value_1") + + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_1", partition=1), + None, + b"value_1", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], + ) + add_message_mock.call_args_list[0].kwargs["headers"][0][1].startswith( + f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode() + ) + + await producer.send("topic_2", b"value_2") + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_2", partition=1), + None, + b"value_2", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], + ) + + async def test_send_produce_hook(self) -> None: + async_produce_hook_mock = mock.AsyncMock() + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_produce_hook=async_produce_hook_mock, + ) + + producer = AIOKafkaProducer(api_version="1.0") + + producer.client._wait_on_metadata = mock.AsyncMock() + producer.client.bootstrap = mock.AsyncMock() + producer._message_accumulator.add_message = mock.AsyncMock() + producer._sender.start = mock.AsyncMock() + producer._partition = mock.Mock(return_value=1) + + await producer.start() + + await producer.send("topic_1", b"value_1") + + async_produce_hook_mock.assert_awaited_once() + + def _compare_spans( + self, spans: List[Span], expected_spans: List[dict] + ) -> None: + self.assertEqual(len(spans), len(expected_spans)) + for span, expected_span in zip(spans, expected_spans): + self.assertEqual(expected_span["name"], span.name) + self.assertEqual(expected_span["kind"], span.kind) + self.assertEqual( + expected_span["attributes"], dict(span.attributes) + ) diff --git a/tox.ini b/tox.ini index f7e4ce55b3..4cc90351b4 100644 --- a/tox.ini +++ b/tox.ini @@ -444,6 +444,7 @@ commands_pre = aiokafka: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api aiokafka: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions aiokafka: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk + aiokafka: pip install opentelemetry-test-utils@{env:CORE_REPO}\#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils aiokafka: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/test-requirements.txt kafka-python: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api From c048b0985b5e82c62525b5a5bf88c3af4cd21d3f Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 17 Sep 2024 01:41:01 +0500 Subject: [PATCH 2/7] add to CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fa1ab2ba3..bfe8415bdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-fastapi` Add autoinstrumentation mechanism tests. ([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860)) - `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka - ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082)) + ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082), [#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) ## Version 1.27.0/0.48b0 () From 58900035cc3ce8a83515e892e584be5c3ec4857d Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 17 Sep 2024 10:52:22 +0500 Subject: [PATCH 3/7] add tests for baggage --- .../tests/test_instrumentation.py | 109 ++++++++++++++---- 1 file changed, 89 insertions(+), 20 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 43d1eac508..175a2fc8eb 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -13,7 +13,7 @@ # limitations under the License. import uuid -from typing import List, Tuple +from typing import List, Sequence, Tuple from unittest import IsolatedAsyncioTestCase, mock from aiokafka import ( @@ -24,12 +24,13 @@ ) from wrapt import BoundFunctionWrapper +from opentelemetry import baggage, context from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor -from opentelemetry.sdk.trace import Span +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.semconv._incubating.attributes import messaging_attributes from opentelemetry.semconv.attributes import server_attributes from opentelemetry.test.test_base import TestBase -from opentelemetry.trace import SpanKind, format_trace_id +from opentelemetry.trace import SpanKind, format_trace_id, set_span_in_context class TestAIOKafka(TestBase, IsolatedAsyncioTestCase): @@ -51,6 +52,19 @@ def consumer_record_factory( headers=headers, ) + @staticmethod + def producer_factory() -> AIOKafkaProducer: + producer = AIOKafkaProducer(api_version="1.0") + + add_message_mock = mock.AsyncMock() + producer.client._wait_on_metadata = mock.AsyncMock() + producer.client.bootstrap = mock.AsyncMock() + producer._message_accumulator.add_message = add_message_mock + producer._sender.start = mock.AsyncMock() + producer._partition = mock.Mock(return_value=1) + + return producer + def test_instrument_api(self) -> None: instrumentation = AIOKafkaInstrumentor() @@ -147,7 +161,45 @@ async def test_anext(self) -> None: span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) - async def test_anext_consumer_hook(self) -> None: + async def test_anext_baggage(self) -> None: + received_baggage = None + + async def async_consume_hook(span, *_) -> None: + nonlocal received_baggage + received_baggage = baggage.get_all(set_span_in_context(span)) + + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument( + tracer_provider=self.tracer_provider, + async_consume_hook=async_consume_hook, + ) + + consumer = AIOKafkaConsumer() + + self.memory_exporter.clear() + + getone_mock = mock.AsyncMock() + consumer.getone = getone_mock + + getone_mock.side_effect = [ + self.consumer_record_factory( + 1, + headers=( + ( + "traceparent", + b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01", + ), + ("baggage", b"foo=bar"), + ), + ), + ] + + await consumer.__anext__() + getone_mock.assert_awaited_with() + + self.assertEqual(received_baggage, {"foo": "bar"}) + + async def test_anext_consume_hook(self) -> None: async_consume_hook_mock = mock.AsyncMock() AIOKafkaInstrumentor().uninstrument() @@ -171,14 +223,10 @@ async def test_send(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - producer = AIOKafkaProducer(api_version="1.0") - - add_message_mock = mock.AsyncMock() - producer.client._wait_on_metadata = mock.AsyncMock() - producer.client.bootstrap = mock.AsyncMock() - producer._message_accumulator.add_message = add_message_mock - producer._sender.start = mock.AsyncMock() - producer._partition = mock.Mock(return_value=1) + producer = self.producer_factory() + add_message_mock: mock.AsyncMock = ( + producer._message_accumulator.add_message + ) await producer.start() @@ -208,6 +256,33 @@ async def test_send(self) -> None: headers=[("traceparent", mock.ANY)], ) + async def test_send_baggage(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + producer = self.producer_factory() + add_message_mock: mock.AsyncMock = ( + producer._message_accumulator.add_message + ) + + await producer.start() + + tracer = self.tracer_provider.get_tracer(__name__) + ctx = baggage.set_baggage("foo", "bar") + context.attach(ctx) + + with tracer.start_as_current_span("test_span", context=ctx): + await producer.send("topic_1", b"value_1") + + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_1", partition=1), + None, + b"value_1", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY), ("baggage", b"foo=bar")], + ) + async def test_send_produce_hook(self) -> None: async_produce_hook_mock = mock.AsyncMock() @@ -217,13 +292,7 @@ async def test_send_produce_hook(self) -> None: async_produce_hook=async_produce_hook_mock, ) - producer = AIOKafkaProducer(api_version="1.0") - - producer.client._wait_on_metadata = mock.AsyncMock() - producer.client.bootstrap = mock.AsyncMock() - producer._message_accumulator.add_message = mock.AsyncMock() - producer._sender.start = mock.AsyncMock() - producer._partition = mock.Mock(return_value=1) + producer = self.producer_factory() await producer.start() @@ -232,7 +301,7 @@ async def test_send_produce_hook(self) -> None: async_produce_hook_mock.assert_awaited_once() def _compare_spans( - self, spans: List[Span], expected_spans: List[dict] + self, spans: Sequence[ReadableSpan], expected_spans: List[dict] ) -> None: self.assertEqual(len(spans), len(expected_spans)) for span, expected_span in zip(spans, expected_spans): From 5372078ff60214ed3ca44ddf12cd79318f0e782d Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 17 Sep 2024 11:23:49 +0500 Subject: [PATCH 4/7] wrap getone instead of __anext__ --- .../instrumentation/aiokafka/__init__.py | 8 +- .../instrumentation/aiokafka/utils.py | 2 +- .../tests/test_instrumentation.py | 86 ++++++++++--------- .../tests/test_utils.py | 4 +- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 5b2b0cd0e8..7d994be622 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -76,7 +76,7 @@ async def async_consume_hook(span, record, args, kwargs): from opentelemetry import trace from opentelemetry.instrumentation.aiokafka.package import _instruments from opentelemetry.instrumentation.aiokafka.utils import ( - _wrap_anext, + _wrap_getone, _wrap_send, ) from opentelemetry.instrumentation.aiokafka.version import __version__ @@ -126,10 +126,10 @@ def _instrument(self, **kwargs): ) wrap_function_wrapper( aiokafka.AIOKafkaConsumer, - "__anext__", - _wrap_anext(tracer, async_consume_hook), + "getone", + _wrap_getone(tracer, async_consume_hook), ) def _uninstrument(self, **kwargs): unwrap(aiokafka.AIOKafkaProducer, "send") - unwrap(aiokafka.AIOKafkaConsumer, "__anext__") + unwrap(aiokafka.AIOKafkaConsumer, "getone") diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 3c54ce1500..cae0d97717 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -335,7 +335,7 @@ async def _create_consumer_span( context.detach(token) -def _wrap_anext( +def _wrap_getone( tracer: Tracer, async_consume_hook: ConsumeHookT ) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]: async def _traced_next( diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 175a2fc8eb..3f8212f42f 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -13,7 +13,7 @@ # limitations under the License. import uuid -from typing import List, Sequence, Tuple +from typing import Any, List, Sequence, Tuple from unittest import IsolatedAsyncioTestCase, mock from aiokafka import ( @@ -53,16 +53,30 @@ def consumer_record_factory( ) @staticmethod - def producer_factory() -> AIOKafkaProducer: + async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer: + consumer = AIOKafkaConsumer(**consumer_kwargs) + + consumer._client.bootstrap = mock.AsyncMock() + consumer._client._wait_on_metadata = mock.AsyncMock() + + await consumer.start() + + consumer._fetcher.next_record = mock.AsyncMock() + + return consumer + + @staticmethod + async def producer_factory() -> AIOKafkaProducer: producer = AIOKafkaProducer(api_version="1.0") - add_message_mock = mock.AsyncMock() producer.client._wait_on_metadata = mock.AsyncMock() producer.client.bootstrap = mock.AsyncMock() - producer._message_accumulator.add_message = add_message_mock + producer._message_accumulator.add_message = mock.AsyncMock() producer._sender.start = mock.AsyncMock() producer._partition = mock.Mock(return_value=1) + await producer.start() + return producer def test_instrument_api(self) -> None: @@ -73,7 +87,7 @@ def test_instrument_api(self) -> None: isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) ) self.assertTrue( - isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper) + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) ) instrumentation.uninstrument() @@ -81,16 +95,19 @@ def test_instrument_api(self) -> None: isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) ) self.assertFalse( - isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper) + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) ) - async def test_anext(self) -> None: + async def test_getone(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) client_id = str(uuid.uuid4()) group_id = str(uuid.uuid4()) - consumer = AIOKafkaConsumer(client_id=client_id, group_id=group_id) + consumer = await self.consumer_factory( + client_id=client_id, group_id=group_id + ) + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record expected_spans = [ { @@ -130,10 +147,7 @@ async def test_anext(self) -> None: ] self.memory_exporter.clear() - getone_mock = mock.AsyncMock() - consumer.getone = getone_mock - - getone_mock.side_effect = [ + next_record_mock.side_effect = [ self.consumer_record_factory( 1, headers=( @@ -146,8 +160,8 @@ async def test_anext(self) -> None: self.consumer_record_factory(2, headers=()), ] - await consumer.__anext__() - getone_mock.assert_awaited_with() + await consumer.getone() + next_record_mock.assert_awaited_with(()) first_span = self.memory_exporter.get_finished_spans()[0] self.assertEqual( @@ -155,13 +169,13 @@ async def test_anext(self) -> None: "03afa25236b8cd948fa853d67038ac79", ) - await consumer.__anext__() - getone_mock.assert_awaited_with() + await consumer.getone() + next_record_mock.assert_awaited_with(()) span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) - async def test_anext_baggage(self) -> None: + async def test_getone_baggage(self) -> None: received_baggage = None async def async_consume_hook(span, *_) -> None: @@ -174,14 +188,12 @@ async def async_consume_hook(span, *_) -> None: async_consume_hook=async_consume_hook, ) - consumer = AIOKafkaConsumer() + consumer = await self.consumer_factory() + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record self.memory_exporter.clear() - getone_mock = mock.AsyncMock() - consumer.getone = getone_mock - - getone_mock.side_effect = [ + next_record_mock.side_effect = [ self.consumer_record_factory( 1, headers=( @@ -194,12 +206,12 @@ async def async_consume_hook(span, *_) -> None: ), ] - await consumer.__anext__() - getone_mock.assert_awaited_with() + await consumer.getone() + next_record_mock.assert_awaited_with(()) self.assertEqual(received_baggage, {"foo": "bar"}) - async def test_anext_consume_hook(self) -> None: + async def test_getone_consume_hook(self) -> None: async_consume_hook_mock = mock.AsyncMock() AIOKafkaInstrumentor().uninstrument() @@ -208,14 +220,14 @@ async def test_anext_consume_hook(self) -> None: async_consume_hook=async_consume_hook_mock, ) - consumer = AIOKafkaConsumer() - - getone_mock = mock.AsyncMock() - consumer.getone = getone_mock + consumer = await self.consumer_factory() + next_record_mock: mock.AsyncMock = consumer._fetcher.next_record - getone_mock.side_effect = [self.consumer_record_factory(1, headers=())] + next_record_mock.side_effect = [ + self.consumer_record_factory(1, headers=()) + ] - await consumer.__anext__() + await consumer.getone() async_consume_hook_mock.assert_awaited_once() @@ -223,13 +235,11 @@ async def test_send(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - producer = self.producer_factory() + producer = await self.producer_factory() add_message_mock: mock.AsyncMock = ( producer._message_accumulator.add_message ) - await producer.start() - tracer = self.tracer_provider.get_tracer(__name__) with tracer.start_as_current_span("test_span") as span: await producer.send("topic_1", b"value_1") @@ -260,13 +270,11 @@ async def test_send_baggage(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - producer = self.producer_factory() + producer = await self.producer_factory() add_message_mock: mock.AsyncMock = ( producer._message_accumulator.add_message ) - await producer.start() - tracer = self.tracer_provider.get_tracer(__name__) ctx = baggage.set_baggage("foo", "bar") context.attach(ctx) @@ -292,9 +300,7 @@ async def test_send_produce_hook(self) -> None: async_produce_hook=async_produce_hook_mock, ) - producer = self.producer_factory() - - await producer.start() + producer = await self.producer_factory() await producer.send("topic_1", b"value_1") diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index b1b2792608..09a8655309 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -23,7 +23,7 @@ _create_consumer_span, _extract_send_partition, _get_span_name, - _wrap_anext, + _wrap_getone, _wrap_send, ) from opentelemetry.trace import SpanKind @@ -187,7 +187,7 @@ async def test_wrap_next( original_next_callback = mock.AsyncMock() kafka_consumer = mock.MagicMock() - wrapped_next = _wrap_anext(tracer, consume_hook) + wrapped_next = _wrap_getone(tracer, consume_hook) record = await wrapped_next( original_next_callback, kafka_consumer, self.args, self.kwargs ) From 0e8f8d153108d6b53eee307e54f395768762e699 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 17 Sep 2024 15:28:28 +0500 Subject: [PATCH 5/7] split sync and async tests (fix review) --- .../tests/test_instrumentation.py | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 3f8212f42f..8211566239 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -14,7 +14,7 @@ import uuid from typing import Any, List, Sequence, Tuple -from unittest import IsolatedAsyncioTestCase, mock +from unittest import IsolatedAsyncioTestCase, TestCase, mock from aiokafka import ( AIOKafkaConsumer, @@ -33,7 +33,28 @@ from opentelemetry.trace import SpanKind, format_trace_id, set_span_in_context -class TestAIOKafka(TestBase, IsolatedAsyncioTestCase): +class TestAIOKafkaInstrumentor(TestCase): + def test_instrument_api(self) -> None: + instrumentation = AIOKafkaInstrumentor() + + instrumentation.instrument() + self.assertTrue( + isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) + ) + self.assertTrue( + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) + ) + + instrumentation.uninstrument() + self.assertFalse( + isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) + ) + self.assertFalse( + isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) + ) + + +class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase): @staticmethod def consumer_record_factory( number: int, headers: Tuple[Tuple[str, bytes], ...] @@ -79,25 +100,6 @@ async def producer_factory() -> AIOKafkaProducer: return producer - def test_instrument_api(self) -> None: - instrumentation = AIOKafkaInstrumentor() - - instrumentation.instrument() - self.assertTrue( - isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) - ) - self.assertTrue( - isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) - ) - - instrumentation.uninstrument() - self.assertFalse( - isinstance(AIOKafkaProducer.send, BoundFunctionWrapper) - ) - self.assertFalse( - isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) - ) - async def test_getone(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) From 8d2dabbb525944b4667280949bb2a6a66e9abb7b Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 18 Sep 2024 22:22:18 +0500 Subject: [PATCH 6/7] add dimastbk to component_owners.yml for aiokafka --- .github/component_owners.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/component_owners.yml b/.github/component_owners.yml index e3ca06b450..dcc1013476 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -61,5 +61,8 @@ components: instrumentation/opentelemetry-instrumentation-psycopg: - federicobond + instrumentation/opentelemetry-instrumentation-aiokafka: + - dimastbk + processor/opentelemetry-processor-baggage: - codeboten From b6159d98d8dc9c56c94573f77708844afc7d757a Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Sep 2024 10:33:55 +0500 Subject: [PATCH 7/7] Update CHANGELOG.md --- CHANGELOG.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfe8415bdf..96c1bcc404 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-fastapi` Add autoinstrumentation mechanism tests. ([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860)) - `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka - ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082), [#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) + ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082)) + + +### Fixed + +- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__` + ([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) ## Version 1.27.0/0.48b0 ()