diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index aae1ea9a4..18d8d0a5b 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -253,3 +253,27 @@ def start_ack_span( end_on_exit=False, ) as ack_span: return ack_span + + +def start_nack_span( + subscription_id: str, + message_count: int, + project_id: str, + links: List[trace.Link], +) -> trace.Span: + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{subscription_id} nack", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.batch.message_count": message_count, + "messaging.operation": "nack", + "gcp.project_id": project_id, + "messaging.destination.name": subscription_id, + "code.function": "modify_ack_deadline", + }, + kind=trace.SpanKind.CLIENT, + links=links, + end_on_exit=False, + ) as nack_span: + return nack_span diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 949e757d6..091915380 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -33,7 +33,10 @@ from google.cloud.pubsub_v1.subscriber.exceptions import ( AcknowledgeStatus, ) -from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import start_ack_span +from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( + start_ack_span, + start_nack_span, +) if typing.TYPE_CHECKING: # pragma: NO COVER import queue @@ -416,10 +419,18 @@ def modify_ack_deadline( ack_ids_gen = (item.ack_id for item in items) deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + + subscription_id: Optional[str] = None + project_id: Optional[str] = None + for item in items: if item.opentelemetry_data: if math.isclose(item.seconds, 0): item.opentelemetry_data.add_subscribe_span_event("nack start") + if subscription_id is None: + subscription_id = item.opentelemetry_data.subscription_id + if project_id is None: + project_id = item.opentelemetry_data.project_id else: item.opentelemetry_data.add_subscribe_span_event("modack start") for _ in range(total_chunks): @@ -427,6 +438,27 @@ def modify_ack_deadline( req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } + subscribe_links: List[trace.Link] = [] + for ack_req in ack_reqs_dict.values(): + if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0): + subscribe_span: Optional[ + trace.Span + ] = ack_req.opentelemetry_data.subscribe_span + if ( + subscribe_span + and subscribe_span.get_span_context().trace_flags.sampled + ): + subscribe_links.append( + trace.Link(subscribe_span.get_span_context()) + ) + nack_span: Optional[trace.Span] = None + if subscription_id and project_id and len(subscribe_links) > 0: + nack_span = start_nack_span( + subscription_id, + len(ack_reqs_dict), + project_id, + subscribe_links, + ) requests_to_retry: List[requests.ModAckRequest] requests_completed: Optional[List[requests.ModAckRequest]] = None if default_deadline is None: @@ -450,6 +482,8 @@ def modify_ack_deadline( ack_reqs_dict=ack_reqs_dict, default_deadline=default_deadline, ) + if nack_span: + nack_span.end() assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried." diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index fd1f916ea..812ad0fb2 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -895,43 +895,81 @@ def test_opentelemetry_nack(span_exporter): ) dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) - opentelemetry_data = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) - opentelemetry_data.start_subscribe_span( + data1 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) + data1.start_subscribe_span( subscription="projects/projectID/subscriptions/subscriptionID", exactly_once_enabled=True, ack_id="ack_id", delivery_attempt=5, ) + data2 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) + data2.start_subscribe_span( + subscription="projects/projectID/subscriptions/subscriptionID", + exactly_once_enabled=True, + ack_id="ack_id2", + delivery_attempt=5, + ) items = [ requests.NackRequest( - ack_id="ack_id_string", + ack_id="ack_id", byte_size=10, ordering_key="", future=None, - opentelemetry_data=opentelemetry_data, - ) + opentelemetry_data=data1, + ), + requests.NackRequest( + ack_id="ack_id2", + byte_size=10, + ordering_key="", + future=None, + opentelemetry_data=data2, + ), ] response_items = [ requests.ModAckRequest( - ack_id="ack_id_string", + ack_id="ack_id", seconds=0, future=None, - opentelemetry_data=opentelemetry_data, - ) + opentelemetry_data=data1, + ), + requests.ModAckRequest( + ack_id="ack_id2", + seconds=0, + future=None, + opentelemetry_data=data2, + ), ] manager.send_unary_modack.return_value = (response_items, []) - dispatcher_.nack(items) + + mock_span_context = mock.Mock(spec=trace.SpanContext) + mock_span_context.trace_flags.sampled = False + with mock.patch.object( + data2._subscribe_span, "get_span_context", return_value=mock_span_context + ): + dispatcher_.nack(items) spans = span_exporter.get_finished_spans() - assert len(spans) == 1 - subscribe_span = spans[0] - assert "messaging.gcp_pubsub.result" in subscribe_span.attributes - assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "nacked" - assert len(subscribe_span.events) == 2 - assert subscribe_span.events[0].name == "nack start" - assert subscribe_span.events[1].name == "nack end" + assert len(spans) == 3 + nack_span = spans[0] + for subscribe_span in spans[1:]: + assert "messaging.gcp_pubsub.result" in subscribe_span.attributes + assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "nacked" + assert len(subscribe_span.events) == 2 + assert subscribe_span.events[0].name == "nack start" + assert subscribe_span.events[1].name == "nack end" + + assert nack_span.name == "subscriptionID nack" + assert nack_span.kind == trace.SpanKind.CLIENT + assert nack_span.parent is None + assert len(nack_span.links) == 1 + assert nack_span.attributes["messaging.system"] == "gcp_pubsub" + assert nack_span.attributes["messaging.batch.message_count"] == 2 + assert nack_span.attributes["messaging.operation"] == "nack" + assert nack_span.attributes["gcp.project_id"] == "projectID" + assert nack_span.attributes["messaging.destination.name"] == "subscriptionID" + assert nack_span.attributes["code.function"] == "modify_ack_deadline" def test_nack():