From edb9ccc46a3d097c823839ef16c140943dd53a04 Mon Sep 17 00:00:00 2001 From: Javier Fernandez Date: Sat, 6 May 2023 15:19:17 +0200 Subject: [PATCH 01/14] feat(confluent-kafka): Add instrumentation to consume method --- CHANGELOG.md | 2 +- .../confluent_kafka/__init__.py | 62 ++++++++++++++++--- .../instrumentation/confluent_kafka/utils.py | 24 +++++-- 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ead8d5c134..54c194ef46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)) - `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging ([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773)) - +- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 12cb363219..a24313ca0e 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .utils import ( KafkaPropertiesExtractor, _enrich_span, + _get_links_from_records, _get_span_name, _kafka_getter, _kafka_setter, @@ -136,6 +137,10 @@ def __init__(self, config): # This method is deliberately implemented in order to allow wrapt to wrap this function def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) + + # This method is deliberately implemented in order to allow wrapt to wrap this function + def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation + return super().consume(*args, **kwargs) class ProxiedProducer(Producer): @@ -178,9 +183,11 @@ def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) def consume( - self, num_messages=1, *args, **kwargs + self, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg - return self._consumer.consume(num_messages, *args, **kwargs) + return ConfluentKafkaInstrumentor.wrap_consume( + self._consumer.consume, self, self._tracer, args, kwargs, + ) def get_watermark_offsets( self, partition, timeout=-1, *args, **kwargs @@ -274,6 +281,11 @@ def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) + + def _inner_wrap_consume(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_consume( + func, instance, self._tracer, args, kwargs + ) wrapt.wrap_function_wrapper( AutoInstrumentedProducer, @@ -286,6 +298,12 @@ def _inner_wrap_poll(func, instance, args, kwargs): "poll", _inner_wrap_poll, ) + + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "consume", + _inner_wrap_consume, + ) def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer @@ -336,13 +354,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): ): record = func(*args, **kwargs) if record: - links = [] - ctx = propagate.extract(record.headers(), getter=_kafka_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - + links = _get_links_from_records([record]) instance._current_consume_span = tracer.start_span( name=f"{record.topic()} process", links=links, @@ -361,3 +373,35 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record + + @staticmethod + def wrap_consume(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): + records = func(*args, **kwargs) + if len(records) > 0: + links = _get_links_from_records(records) + instance._current_consume_span = tracer.start_span( + name=f"{records[0].topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + _enrich_span( + instance._current_consume_span, + records[0].topic(), + operation=MessagingOperationValues.PROCESS, + ) + + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) + + return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 77fce03cd8..6ab0c9a4cd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,6 +1,8 @@ from logging import getLogger from typing import List, Optional +from opentelemetry import propagate +from opentelemetry.trace import SpanKind, Link from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, @@ -82,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: def _enrich_span( - span, - topic, - partition: Optional[int] = None, - offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + span, + topic, + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, ): if not span.is_recording(): return @@ -116,6 +118,18 @@ def _enrich_span( ) +def _get_links_from_records(records): + links = [] + for record in records: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + return links + + _kafka_setter = KafkaContextSetter() From f6e5f9826e697662a520373304d9245af8c82e03 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 6 May 2023 15:19:17 +0200 Subject: [PATCH 02/14] feat(confluent-kafka): Add instrumentation to consume method --- CHANGELOG.md | 2 +- .../confluent_kafka/__init__.py | 62 ++++++++++++++++--- .../instrumentation/confluent_kafka/utils.py | 24 +++++-- 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ead8d5c134..54c194ef46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)) - `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging ([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773)) - +- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 12cb363219..a24313ca0e 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .utils import ( KafkaPropertiesExtractor, _enrich_span, + _get_links_from_records, _get_span_name, _kafka_getter, _kafka_setter, @@ -136,6 +137,10 @@ def __init__(self, config): # This method is deliberately implemented in order to allow wrapt to wrap this function def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) + + # This method is deliberately implemented in order to allow wrapt to wrap this function + def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation + return super().consume(*args, **kwargs) class ProxiedProducer(Producer): @@ -178,9 +183,11 @@ def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) def consume( - self, num_messages=1, *args, **kwargs + self, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg - return self._consumer.consume(num_messages, *args, **kwargs) + return ConfluentKafkaInstrumentor.wrap_consume( + self._consumer.consume, self, self._tracer, args, kwargs, + ) def get_watermark_offsets( self, partition, timeout=-1, *args, **kwargs @@ -274,6 +281,11 @@ def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) + + def _inner_wrap_consume(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_consume( + func, instance, self._tracer, args, kwargs + ) wrapt.wrap_function_wrapper( AutoInstrumentedProducer, @@ -286,6 +298,12 @@ def _inner_wrap_poll(func, instance, args, kwargs): "poll", _inner_wrap_poll, ) + + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "consume", + _inner_wrap_consume, + ) def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer @@ -336,13 +354,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): ): record = func(*args, **kwargs) if record: - links = [] - ctx = propagate.extract(record.headers(), getter=_kafka_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - + links = _get_links_from_records([record]) instance._current_consume_span = tracer.start_span( name=f"{record.topic()} process", links=links, @@ -361,3 +373,35 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record + + @staticmethod + def wrap_consume(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): + records = func(*args, **kwargs) + if len(records) > 0: + links = _get_links_from_records(records) + instance._current_consume_span = tracer.start_span( + name=f"{records[0].topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + _enrich_span( + instance._current_consume_span, + records[0].topic(), + operation=MessagingOperationValues.PROCESS, + ) + + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) + + return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 77fce03cd8..6ab0c9a4cd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,6 +1,8 @@ from logging import getLogger from typing import List, Optional +from opentelemetry import propagate +from opentelemetry.trace import SpanKind, Link from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, @@ -82,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: def _enrich_span( - span, - topic, - partition: Optional[int] = None, - offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + span, + topic, + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, ): if not span.is_recording(): return @@ -116,6 +118,18 @@ def _enrich_span( ) +def _get_links_from_records(records): + links = [] + for record in records: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + return links + + _kafka_setter = KafkaContextSetter() From fbfc055cf2d17ad7981f1fe62e137e741b40f5b1 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 6 May 2023 16:27:27 +0200 Subject: [PATCH 03/14] chore(changelog): Added pull request link --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54c194ef46..77abf63292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging ([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773)) - `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method + ([#1786](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1786)) ### Fixed From f042deaf340762a837829f29619536329864613a Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 20 May 2023 13:31:56 +0200 Subject: [PATCH 04/14] fix(confluent-kafka): Fix wrong partition and offset if --- .../opentelemetry/instrumentation/confluent_kafka/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 6ab0c9a4cd..e39721d6bd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -96,7 +96,7 @@ def _enrich_span( span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - if partition: + if partition is not None: span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) span.set_attribute( @@ -111,7 +111,7 @@ def _enrich_span( # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. - if partition and offset and topic: + if partition is not None and offset is not None and topic: span.set_attribute( SpanAttributes.MESSAGING_MESSAGE_ID, f"{topic}.{partition}.{offset}", From 7e25294ff88d8698b674c2d9384b79c00e63c874 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 20 May 2023 13:32:31 +0200 Subject: [PATCH 05/14] chore(confluent-kafka): Add test for poll and consume --- .../tests/test_instrumentation.py | 165 +++++++++++++++++- .../tests/utils.py | 39 +++++ 2 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 1e3f304188..2c9ebc1abd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,7 +14,11 @@ # pylint: disable=no-name-in-module -from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues +from opentelemetry.test.test_base import TestBase +from .utils import MockConsumer, MockedMessage from confluent_kafka import Consumer, Producer @@ -29,7 +33,7 @@ ) -class TestConfluentKafka(TestCase): +class TestConfluentKafka(TestBase): def test_instrument_api(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -104,3 +108,160 @@ def test_context_getter(self) -> None: context_setter.set(carrier_list, "key1", "val1") self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"]) self.assertEqual(["key1"], context_getter.keys(carrier_list)) + + def test_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-10", 0, 0, []), + MockedMessage("topic-20", 2, 4, []), + MockedMessage("topic-30", 1, 3, []), + ] + expected_spans= [ + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-10 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-10", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-20 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 2, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-20", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-30 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 1, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-30", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", + } + }, + { + "name": "recv", + "attributes": {} + }, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + span_list = self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll(1) + consumer.poll(1) + consumer.poll(1) + consumer.poll(1) + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def test_consume(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-1", 0, 0, []), + MockedMessage("topic-1", 2, 1, []), + MockedMessage("topic-1", 3, 2, []), + MockedMessage("topic-2", 0, 0, []), + MockedMessage("topic-3", 0, 3, []), + MockedMessage("topic-2", 0, 1, []), + ] + expected_spans= [ + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-1 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-1", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-2 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-2", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + { + "name": "topic-3 process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-3", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + } + }, + { + "name": "recv", + "attributes": {} + }, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + + span_list = self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume(3) + consumer.consume(1) + consumer.consume(2) + consumer.consume(1) + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + + def _compare_spans(self, spans, expected_spans): + for (span, expected_span) in zip(spans, expected_spans): + self.assertEqual(expected_span['name'], span.name) + for attribute_key, expected_attribute_value in expected_span['attributes'].items(): + self.assertEqual(expected_attribute_value, span.attributes[attribute_key]) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py new file mode 100644 index 0000000000..ab2e6b48a9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -0,0 +1,39 @@ +from confluent_kafka import Consumer + + +class MockConsumer(Consumer): + + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def consume(self, num_messages=1, *args, **kwargs): + messages = self._queue[:num_messages] + self._queue = self._queue[num_messages:] + return messages + + def poll(self, timeout=None): + if len(self._queue) > 0: + return self._queue.pop(0) + else: + return None + + +class MockedMessage: + def __init__(self, topic: str, partition: int, offset: int, headers): + self._topic = topic + self._partition = partition + self._offset = offset + self._headers = headers + + def topic(self): + return self._topic + + def partition(self): + return self._partition + + def offset(self): + return self._offset + + def headers(self): + return self._headers From 3a31f6b20a323571b0426824dac84a0d3f234473 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 20 May 2023 13:54:53 +0200 Subject: [PATCH 06/14] chore(confluent-kafka): Removed unused import --- .../tests/test_instrumentation.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 2c9ebc1abd..f1016e7460 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,8 +14,6 @@ # pylint: disable=no-name-in-module -from unittest.mock import patch - from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues from opentelemetry.test.test_base import TestBase from .utils import MockConsumer, MockedMessage From 49b65c53a04a0672a79a2b027dd2503674e25b50 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 20 May 2023 13:57:15 +0200 Subject: [PATCH 07/14] fix(confluent-kafka): Removed non relevant argument in poll --- .../tests/test_instrumentation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index f1016e7460..e47d43cdcb 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -176,10 +176,10 @@ def test_poll(self) -> None: ) span_list = self.memory_exporter.clear() consumer = instrumentation.instrument_consumer(consumer) - consumer.poll(1) - consumer.poll(1) - consumer.poll(1) - consumer.poll(1) + consumer.poll() + consumer.poll() + consumer.poll() + consumer.poll() span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) From e31e21c841ddd5cd86ddaecd5b52a7b05780a8e9 Mon Sep 17 00:00:00 2001 From: javferrod Date: Mon, 26 Jun 2023 18:21:31 +0200 Subject: [PATCH 08/14] fix(confluent-kafka): Fixed identation --- .../instrumentation/confluent_kafka/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index e39721d6bd..20b97c99a7 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -84,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: def _enrich_span( - span, - topic, - partition: Optional[int] = None, - offset: Optional[int] = None, - operation: Optional[MessagingOperationValues] = None, + span, + topic, + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, ): if not span.is_recording(): return From 7a865558b4f29ee642f0a454e91d4d2b60c81038 Mon Sep 17 00:00:00 2001 From: javferrod Date: Tue, 27 Jun 2023 19:21:20 +0200 Subject: [PATCH 09/14] fix(confluent-kafka): Removed unused pylint directive --- .../opentelemetry/instrumentation/confluent_kafka/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index a24313ca0e..de99bd351e 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -184,7 +184,7 @@ def commit(self, *args, **kwargs): def consume( self, *args, **kwargs - ): # pylint: disable=keyword-arg-before-vararg + ): return ConfluentKafkaInstrumentor.wrap_consume( self._consumer.consume, self, self._tracer, args, kwargs, ) From 4c9cce68be31f12d59992013ab84f631e893c896 Mon Sep 17 00:00:00 2001 From: javferrod Date: Tue, 27 Jun 2023 19:23:08 +0200 Subject: [PATCH 10/14] fix(confluent-kafka): Fixed some lint errors --- .../instrumentation/confluent_kafka/__init__.py | 14 ++++++-------- .../tests/utils.py | 5 ++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index de99bd351e..37ac316c2d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -137,7 +137,7 @@ def __init__(self, config): # This method is deliberately implemented in order to allow wrapt to wrap this function def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) - + # This method is deliberately implemented in order to allow wrapt to wrap this function def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs) @@ -182,9 +182,7 @@ def committed(self, partitions, timeout=-1): def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs) - def consume( - self, *args, **kwargs - ): + def consume(self, *args, **kwargs): return ConfluentKafkaInstrumentor.wrap_consume( self._consumer.consume, self, self._tracer, args, kwargs, ) @@ -281,7 +279,7 @@ def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) - + def _inner_wrap_consume(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_consume( func, instance, self._tracer, args, kwargs @@ -298,7 +296,7 @@ def _inner_wrap_consume(func, instance, args, kwargs): "poll", _inner_wrap_poll, ) - + wrapt.wrap_function_wrapper( AutoInstrumentedConsumer, "consume", @@ -373,7 +371,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): ) return record - + @staticmethod def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: @@ -399,7 +397,7 @@ def wrap_consume(func, instance, tracer, args, kwargs): records[0].topic(), operation=MessagingOperationValues.PROCESS, ) - + instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index ab2e6b48a9..ffcbb66d61 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -7,7 +7,7 @@ def __init__(self, queue, config): self._queue = queue super().__init__(config) - def consume(self, num_messages=1, *args, **kwargs): + def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg messages = self._queue[:num_messages] self._queue = self._queue[num_messages:] return messages @@ -15,8 +15,7 @@ def consume(self, num_messages=1, *args, **kwargs): def poll(self, timeout=None): if len(self._queue) > 0: return self._queue.pop(0) - else: - return None + return None class MockedMessage: From ac9ec26c57a83b4099daaad334aa0423eb0e0394 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sun, 9 Jul 2023 17:42:53 +0200 Subject: [PATCH 11/14] fix(confluent-kafka): Fixed lint errors --- .../confluent_kafka/__init__.py | 10 ++- .../tests/test_instrumentation.py | 89 ++++++++----------- .../tests/utils.py | 7 +- 3 files changed, 48 insertions(+), 58 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 37ac316c2d..0ed28912c6 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -139,7 +139,9 @@ def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout) # This method is deliberately implemented in order to allow wrapt to wrap this function - def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation + def consume( + self, *args, **kwargs + ): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs) @@ -184,7 +186,11 @@ def commit(self, *args, **kwargs): def consume(self, *args, **kwargs): return ConfluentKafkaInstrumentor.wrap_consume( - self._consumer.consume, self, self._tracer, args, kwargs, + self._consumer.consume, + self, + self._tracer, + args, + kwargs, ) def get_watermark_offsets( diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index e47d43cdcb..8160c852d7 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -14,7 +14,10 @@ # pylint: disable=no-name-in-module -from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues +from opentelemetry.semconv.trace import ( + SpanAttributes, + MessagingDestinationKindValues, +) from opentelemetry.test.test_base import TestBase from .utils import MockConsumer, MockedMessage @@ -106,7 +109,7 @@ def test_context_getter(self) -> None: context_setter.set(carrier_list, "key1", "val1") self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"]) self.assertEqual(["key1"], context_getter.keys(carrier_list)) - + def test_poll(self) -> None: instrumentation = ConfluentKafkaInstrumentor() mocked_messages = [ @@ -114,11 +117,8 @@ def test_poll(self) -> None: MockedMessage("topic-20", 2, 4, []), MockedMessage("topic-30", 1, 3, []), ] - expected_spans= [ - { - "name": "recv", - "attributes": {} - }, + expected_spans = [ + {"name": "recv", "attributes": {}}, { "name": "topic-10 process", "attributes": { @@ -128,12 +128,9 @@ def test_poll(self) -> None: SpanAttributes.MESSAGING_DESTINATION: "topic-10", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0", - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, { "name": "topic-20 process", "attributes": { @@ -143,12 +140,9 @@ def test_poll(self) -> None: SpanAttributes.MESSAGING_DESTINATION: "topic-20", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4", - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, { "name": "topic-30 process", "attributes": { @@ -158,21 +152,18 @@ def test_poll(self) -> None: SpanAttributes.MESSAGING_DESTINATION: "topic-30", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3", - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, ] - + consumer = MockConsumer( mocked_messages, { "bootstrap.servers": "localhost:29092", "group.id": "mygroup", "auto.offset.reset": "earliest", - } + }, ) span_list = self.memory_exporter.clear() consumer = instrumentation.instrument_consumer(consumer) @@ -180,10 +171,10 @@ def test_poll(self) -> None: consumer.poll() consumer.poll() consumer.poll() - + span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) - + def test_consume(self) -> None: instrumentation = ConfluentKafkaInstrumentor() mocked_messages = [ @@ -194,11 +185,8 @@ def test_consume(self) -> None: MockedMessage("topic-3", 0, 3, []), MockedMessage("topic-2", 0, 1, []), ] - expected_spans= [ - { - "name": "recv", - "attributes": {} - }, + expected_spans = [ + {"name": "recv", "attributes": {}}, { "name": "topic-1 process", "attributes": { @@ -206,12 +194,9 @@ def test_consume(self) -> None: SpanAttributes.MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-1", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, { "name": "topic-2 process", "attributes": { @@ -219,12 +204,9 @@ def test_consume(self) -> None: SpanAttributes.MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-2", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, { "name": "topic-3 process", "attributes": { @@ -232,23 +214,20 @@ def test_consume(self) -> None: SpanAttributes.MESSAGING_SYSTEM: "kafka", SpanAttributes.MESSAGING_DESTINATION: "topic-3", SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, - } - }, - { - "name": "recv", - "attributes": {} + }, }, + {"name": "recv", "attributes": {}}, ] - + consumer = MockConsumer( mocked_messages, { "bootstrap.servers": "localhost:29092", "group.id": "mygroup", "auto.offset.reset": "earliest", - } + }, ) - + span_list = self.memory_exporter.clear() consumer = instrumentation.instrument_consumer(consumer) consumer.consume(3) @@ -259,7 +238,11 @@ def test_consume(self) -> None: self._compare_spans(span_list, expected_spans) def _compare_spans(self, spans, expected_spans): - for (span, expected_span) in zip(spans, expected_spans): - self.assertEqual(expected_span['name'], span.name) - for attribute_key, expected_attribute_value in expected_span['attributes'].items(): - self.assertEqual(expected_attribute_value, span.attributes[attribute_key]) + for span, expected_span in zip(spans, expected_spans): + self.assertEqual(expected_span["name"], span.name) + for attribute_key, expected_attribute_value in expected_span[ + "attributes" + ].items(): + self.assertEqual( + expected_attribute_value, span.attributes[attribute_key] + ) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index ffcbb66d61..798daaeff4 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -2,16 +2,17 @@ class MockConsumer(Consumer): - def __init__(self, queue, config): self._queue = queue super().__init__(config) - def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg + def consume( + self, num_messages=1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg messages = self._queue[:num_messages] self._queue = self._queue[num_messages:] return messages - + def poll(self, timeout=None): if len(self._queue) > 0: return self._queue.pop(0) From a5527783db3fc982f8a743788d8d37fdaaa3b033 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sun, 13 Aug 2023 15:34:55 +0200 Subject: [PATCH 12/14] fix((confluent-kafka): Removed unused span_list --- .../tests/test_instrumentation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 8160c852d7..d7ac343dbf 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -165,7 +165,7 @@ def test_poll(self) -> None: "auto.offset.reset": "earliest", }, ) - span_list = self.memory_exporter.clear() + self.memory_exporter.clear() consumer = instrumentation.instrument_consumer(consumer) consumer.poll() consumer.poll() @@ -228,7 +228,7 @@ def test_consume(self) -> None: }, ) - span_list = self.memory_exporter.clear() + self.memory_exporter.clear() consumer = instrumentation.instrument_consumer(consumer) consumer.consume(3) consumer.consume(1) From 1815057cd3681ef6356db873fe93e182025c574c Mon Sep 17 00:00:00 2001 From: javferrod Date: Sun, 13 Aug 2023 16:23:36 +0200 Subject: [PATCH 13/14] fix((confluent-kafka): Extracted some shared logic between poll and consume methods. --- .../confluent_kafka/__init__.py | 29 ++++--------------- .../instrumentation/confluent_kafka/utils.py | 18 +++++++++++- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 0ed28912c6..c4e68b33b4 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -112,8 +112,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .package import _instruments from .utils import ( KafkaPropertiesExtractor, + _end_current_consume_span, + _create_new_consume_span, _enrich_span, - _get_links_from_records, _get_span_name, _kafka_getter, _kafka_setter, @@ -348,23 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs): @staticmethod def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: - context.detach(instance._current_context_token) - instance._current_context_token = None - instance._current_consume_span.end() - instance._current_consume_span = None + _end_current_consume_span(instance) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): record = func(*args, **kwargs) if record: - links = _get_links_from_records([record]) - instance._current_consume_span = tracer.start_span( - name=f"{record.topic()} process", - links=links, - kind=SpanKind.CONSUMER, - ) - + _create_new_consume_span(instance, tracer, [record]) _enrich_span( instance._current_consume_span, record.topic(), @@ -381,23 +373,14 @@ def wrap_poll(func, instance, tracer, args, kwargs): @staticmethod def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: - context.detach(instance._current_context_token) - instance._current_context_token = None - instance._current_consume_span.end() - instance._current_consume_span = None + _end_current_consume_span(instance) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): records = func(*args, **kwargs) if len(records) > 0: - links = _get_links_from_records(records) - instance._current_consume_span = tracer.start_span( - name=f"{records[0].topic()} process", - links=links, - kind=SpanKind.CONSUMER, - ) - + _create_new_consume_span(instance, tracer, records) _enrich_span( instance._current_consume_span, records[0].topic(), diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 20b97c99a7..a53b5108c6 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -1,7 +1,7 @@ from logging import getLogger from typing import List, Optional -from opentelemetry import propagate +from opentelemetry import context, propagate from opentelemetry.trace import SpanKind, Link from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import ( @@ -83,6 +83,22 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: _kafka_getter = KafkaContextGetter() +def _end_current_consume_span(instance): + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + +def _create_new_consume_span(instance, tracer, records): + links = _get_links_from_records(records) + instance._current_consume_span = tracer.start_span( + name=f"{records[0].topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + def _enrich_span( span, topic, From 51167dd617a1c4f2876c629e19e23f9b2b015ca9 Mon Sep 17 00:00:00 2001 From: javferrod Date: Sat, 19 Aug 2023 20:56:05 +0200 Subject: [PATCH 14/14] fix(confluent-kafka): changed function order --- .../instrumentation/confluent_kafka/utils.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index a53b5108c6..2029960703 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -99,6 +99,18 @@ def _create_new_consume_span(instance, tracer, records): ) +def _get_links_from_records(records): + links = [] + for record in records: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + return links + + def _enrich_span( span, topic, @@ -134,18 +146,6 @@ def _enrich_span( ) -def _get_links_from_records(records): - links = [] - for record in records: - ctx = propagate.extract(record.headers(), getter=_kafka_getter) - if ctx: - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - - return links - - _kafka_setter = KafkaContextSetter()