diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 6347b3963..f6d1ccf4d 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -77,3 +77,7 @@ def add_subscribe_span_event(self, event: str) -> None: "timestamp": str(datetime.now()), }, ) + + def end_subscribe_span(self) -> None: + assert self._subscribe_span is not None + self._subscribe_span.end() diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 15ad4abb3..1242e8a24 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -243,6 +243,11 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: ack_reqs_dict=ack_reqs_dict, ) + for completed_ack in requests_completed: + if completed_ack.opentelemetry_data: + completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") + completed_ack.opentelemetry_data.end_subscribe_span() + # Remove the completed messages from lease management. self.drop(requests_completed) @@ -286,6 +291,12 @@ def _retry_acks(self, requests_to_retry): ack_ids=[req.ack_id for req in requests_to_retry], ack_reqs_dict=ack_reqs_dict, ) + + for completed_ack in requests_completed: + if completed_ack.opentelemetry_data: + completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") + completed_ack.opentelemetry_data.end_subscribe_span() + assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried."