Skip to content

Commit

Permalink
Add links to subscribe span of the sampled ack_spans in dispatcher.ack
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 24, 2024
1 parent 7936911 commit 5874a96
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
9 changes: 9 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
}

subscribe_links: List[trace.Link] = []
subscribe_spans: List[trace.Span] = []
for ack_req in ack_reqs_dict.values():
if ack_req.opentelemetry_data:
subscribe_span: Optional[
Expand All @@ -267,6 +268,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
subscribe_spans.append(subscribe_span)
ack_span: Optional[trace.Span] = None
if subscription_id and project_id:
ack_span = start_ack_span(
Expand All @@ -275,6 +277,13 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
project_id,
subscribe_links,
)
if (
ack_span and ack_span.get_span_context().trace_flags.sampled
): # pragma: NO COVER
ack_span_context: trace.SpanContext = ack_span.get_span_context()
for subscribe_span in subscribe_spans:
subscribe_span.add_link(ack_span_context)

requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
ack_reqs_dict=ack_reqs_dict,
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ def test_opentelemetry_modify_ack_deadline(span_exporter):
assert subscribe_span.events[1].name == "modack end"


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_ack(span_exporter):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand Down Expand Up @@ -464,6 +468,14 @@ def test_opentelemetry_ack(span_exporter):
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"

# This subscribe span is sampled, so we expect it to be linked to the ack
# span.
assert len(spans[1].links) == 1
assert spans[1].links[0].context == ack_span.context
# This subscribe span is not sampled, so we expect it to not be linked to
# the ack span
assert len(spans[2].links) == 0

assert ack_span.name == "subscriptionID ack"
assert ack_span.kind == trace.SpanKind.CLIENT
assert ack_span.parent is None
Expand Down

0 comments on commit 5874a96

Please sign in to comment.