From 4e17352e47b996f809e0aede200e8dd577551820 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Tue, 24 Sep 2024 14:50:29 +0000 Subject: [PATCH] Add links to subscribe span of the sampled ack_spans in dispatcher.ack --- .../pubsub_v1/subscriber/_protocol/dispatcher.py | 7 +++++++ tests/unit/pubsub_v1/subscriber/test_dispatcher.py | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 3b2f7918a..f0a61f25f 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -255,6 +255,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: } subscribe_links: List[trace.Link] = [] + subscribe_spans: List[trace.Span] = [] for ack_req in ack_reqs_dict.values(): if ack_req.opentelemetry_data: subscribe_span: Optional[ @@ -267,6 +268,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: subscribe_links.append( trace.Link(subscribe_span.get_span_context()) ) + subscribe_spans.append(subscribe_span) ack_span: Optional[trace.Span] = None if subscription_id and project_id: ack_span = start_ack_span( @@ -275,6 +277,11 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: project_id, subscribe_links, ) + if ack_span and ack_span.get_span_context().trace_flags.sampled: + ack_span_context: trace.SpanContext = ack_span.get_span_context() + for subscribe_span in subscribe_spans: + subscribe_span.add_link(ack_span_context) + requests_completed, requests_to_retry = self._manager.send_unary_ack( ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), ack_reqs_dict=ack_reqs_dict, diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index bf557ea2f..6e124f499 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -407,6 +407,10 @@ def test_opentelemetry_modify_ack_deadline(span_exporter): assert subscribe_span.events[1].name == "modack end" +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) def test_opentelemetry_ack(span_exporter): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -464,6 +468,14 @@ def test_opentelemetry_ack(span_exporter): assert subscribe_span.events[0].name == "ack start" assert subscribe_span.events[1].name == "ack end" + # This subscribe span is sampled, so we expect it to be linked to the ack + # span. + assert len(spans[1].links) == 1 + assert spans[1].links[0].context == ack_span.context + # This subscribe span is not sampled, so we expect it to not be linked to + # the ack span + assert len(spans[2].links) == 0 + assert ack_span.name == "subscriptionID ack" assert ack_span.kind == trace.SpanKind.CLIENT assert ack_span.parent is None