Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opentelemetry-instrumentation-aiokafka): wrap getone instead of anext, add tests #2874

Merged
merged 12 commits into from
Sep 26, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-fastapi` Add autoinstrumentation mechanism tests.
([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860))
- `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082))
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082), [#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874))
xrmx marked this conversation as resolved.
Show resolved Hide resolved

## Version 1.27.0/0.48b0 ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def async_consume_hook(span, record, args, kwargs):
from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
_wrap_anext,
_wrap_getone,
_wrap_send,
)
from opentelemetry.instrumentation.aiokafka.version import __version__
Expand Down Expand Up @@ -126,10 +126,10 @@ def _instrument(self, **kwargs):
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"__anext__",
_wrap_anext(tracer, async_consume_hook),
"getone",
xrmx marked this conversation as resolved.
Show resolved Hide resolved
_wrap_getone(tracer, async_consume_hook),
)

def _uninstrument(self, **kwargs):
unwrap(aiokafka.AIOKafkaProducer, "send")
unwrap(aiokafka.AIOKafkaConsumer, "__anext__")
unwrap(aiokafka.AIOKafkaConsumer, "getone")
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ async def _create_consumer_span(
context.detach(token)


def _wrap_anext(
def _wrap_getone(
tracer: Tracer, async_consume_hook: ConsumeHookT
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]:
async def _traced_next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,74 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import uuid
from typing import Any, List, Sequence, Tuple
from unittest import IsolatedAsyncioTestCase, mock

from aiokafka import (
AIOKafkaConsumer,
AIOKafkaProducer,
ConsumerRecord,
TopicPartition,
)
from wrapt import BoundFunctionWrapper

from opentelemetry import baggage, context
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind, format_trace_id, set_span_in_context


class TestAIOKafka(TestBase, IsolatedAsyncioTestCase):
@staticmethod
def consumer_record_factory(
number: int, headers: Tuple[Tuple[str, bytes], ...]
) -> ConsumerRecord:
return ConsumerRecord(
f"topic_{number}",
number,
number,
number,
number,
f"key_{number}".encode(),
f"value_{number}".encode(),
None,
number,
number,
headers=headers,
)

@staticmethod
async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
consumer = AIOKafkaConsumer(**consumer_kwargs)

consumer._client.bootstrap = mock.AsyncMock()
consumer._client._wait_on_metadata = mock.AsyncMock()

await consumer.start()

consumer._fetcher.next_record = mock.AsyncMock()

return consumer

@staticmethod
async def producer_factory() -> AIOKafkaProducer:
producer = AIOKafkaProducer(api_version="1.0")

producer.client._wait_on_metadata = mock.AsyncMock()
producer.client.bootstrap = mock.AsyncMock()
producer._message_accumulator.add_message = mock.AsyncMock()
producer._sender.start = mock.AsyncMock()
producer._partition = mock.Mock(return_value=1)

await producer.start()

return producer

class TestAIOKafka(TestCase):
def test_instrument_api(self) -> None:
xrmx marked this conversation as resolved.
Show resolved Hide resolved
instrumentation = AIOKafkaInstrumentor()

Expand All @@ -28,13 +87,232 @@ def test_instrument_api(self) -> None:
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)

instrumentation.uninstrument()
self.assertFalse(
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)

async def test_getone(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

expected_spans = [
{
"name": "topic_1 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1",
},
},
{
"name": "topic_2 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2",
},
},
]
self.memory_exporter.clear()

next_record_mock.side_effect = [
self.consumer_record_factory(
1,
headers=(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
),
),
self.consumer_record_factory(2, headers=()),
]

await consumer.getone()
next_record_mock.assert_awaited_with(())

first_span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(
format_trace_id(first_span.get_span_context().trace_id),
"03afa25236b8cd948fa853d67038ac79",
)

await consumer.getone()
next_record_mock.assert_awaited_with(())

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

async def test_getone_baggage(self) -> None:
received_baggage = None

async def async_consume_hook(span, *_) -> None:
nonlocal received_baggage
received_baggage = baggage.get_all(set_span_in_context(span))

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_consume_hook=async_consume_hook,
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

self.memory_exporter.clear()

next_record_mock.side_effect = [
self.consumer_record_factory(
1,
headers=(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
("baggage", b"foo=bar"),
),
),
]

await consumer.getone()
next_record_mock.assert_awaited_with(())

self.assertEqual(received_baggage, {"foo": "bar"})

async def test_getone_consume_hook(self) -> None:
async_consume_hook_mock = mock.AsyncMock()

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_consume_hook=async_consume_hook_mock,
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

next_record_mock.side_effect = [
self.consumer_record_factory(1, headers=())
]

await consumer.getone()

async_consume_hook_mock.assert_awaited_once()

async def test_send(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
with tracer.start_as_current_span("test_span") as span:
await producer.send("topic_1", b"value_1")

add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_1", partition=1),
None,
b"value_1",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)
add_message_mock.call_args_list[0].kwargs["headers"][0][1].startswith(
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
)

await producer.send("topic_2", b"value_2")
add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_2", partition=1),
None,
b"value_2",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)

async def test_send_baggage(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
ctx = baggage.set_baggage("foo", "bar")
context.attach(ctx)

with tracer.start_as_current_span("test_span", context=ctx):
await producer.send("topic_1", b"value_1")

add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_1", partition=1),
None,
b"value_1",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY), ("baggage", b"foo=bar")],
)

async def test_send_produce_hook(self) -> None:
async_produce_hook_mock = mock.AsyncMock()

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_produce_hook=async_produce_hook_mock,
)

producer = await self.producer_factory()

await producer.send("topic_1", b"value_1")

async_produce_hook_mock.assert_awaited_once()

def _compare_spans(
self, spans: Sequence[ReadableSpan], expected_spans: List[dict]
) -> None:
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
self.assertEqual(expected_span["kind"], span.kind)
self.assertEqual(
expected_span["attributes"], dict(span.attributes)
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
_create_consumer_span,
_extract_send_partition,
_get_span_name,
_wrap_anext,
_wrap_getone,
_wrap_send,
)
from opentelemetry.trace import SpanKind
Expand Down Expand Up @@ -187,7 +187,7 @@ async def test_wrap_next(
original_next_callback = mock.AsyncMock()
kafka_consumer = mock.MagicMock()

wrapped_next = _wrap_anext(tracer, consume_hook)
wrapped_next = _wrap_getone(tracer, consume_hook)
record = await wrapped_next(
original_next_callback, kafka_consumer, self.args, self.kwargs
)
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ commands_pre =
aiokafka: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
aiokafka: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions
aiokafka: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk
aiokafka: pip install opentelemetry-test-utils@{env:CORE_REPO}\#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils
aiokafka: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/test-requirements.txt

kafka-python: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
Expand Down
Loading