diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d4bfec6dd..cc398cf5c 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -1032,6 +1032,7 @@ def _send_lease_modacks( modack_span: Optional[trace.Span] = None if self._client.open_telemetry_enabled: subscribe_span_links: List[trace.Link] = [] + subscribe_spans: List[trace.Span] = [] assert len(self._subscription.split("/")) == 4 subscription_id: str = self._subscription.split("/")[3] project_id: str = self._subscription.split("/")[1] @@ -1044,6 +1045,7 @@ def _send_lease_modacks( subscribe_span_links.append( trace.Link(subscribe_span.get_span_context()) ) + subscribe_spans.append(subscribe_span) modack_span = start_modack_span( subscribe_span_links, subscription_id, @@ -1053,6 +1055,17 @@ def _send_lease_modacks( "_send_lease_modacks", receipt_modack, ) + if ( + modack_span and modack_span.get_span_context().trace_flags.sampled + ): # pragma: NO COVER + modack_span_context: trace.SpanContext = modack_span.get_span_context() + for subscribe_span in subscribe_spans: + subscribe_span.add_link( + context=modack_span_context, + attributes={ + "messaging.operation.name": "modack", + }, + ) with self._exactly_once_enabled_lock: exactly_once_enabled = self._exactly_once_enabled diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 3219355cd..ff0e2ec1e 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -632,6 +632,10 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): assert manager._on_hold_bytes == 0 # should be auto-corrected +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) @pytest.mark.parametrize( "receipt_modack", [ @@ -683,6 +687,11 @@ def test_opentelemetry__send_lease_modacks(span_exporter, receipt_modack): assert len(subscribe_span1.events) == 0 assert len(subscribe_span2.events) == 0 + assert len(subscribe_span1.links) == 0 + assert len(subscribe_span2.links) == 1 + assert subscribe_span2.links[0].context == modack_span.context + assert subscribe_span2.links[0].attributes["messaging.operation.name"] == "modack" + assert modack_span.name == "subscriptionID modack" assert modack_span.parent is None assert modack_span.kind == trace.SpanKind.CLIENT