Skip to content

Commit

Permalink
Add nack span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 23, 2024
1 parent 54a88ba commit 65aae0f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 17 deletions.
24 changes: 24 additions & 0 deletions google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,27 @@ def start_ack_span(
end_on_exit=False,
) as ack_span:
return ack_span


def start_nack_span(
subscription_id: str,
message_count: int,
project_id: str,
links: List[trace.Link],
) -> trace.Span:
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} nack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.operation": "nack",
"gcp.project_id": project_id,
"messaging.destination.name": subscription_id,
"code.function": "modify_ack_deadline",
},
kind=trace.SpanKind.CLIENT,
links=links,
end_on_exit=False,
) as nack_span:
return nack_span
36 changes: 35 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeStatus,
)
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import start_ack_span
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
start_ack_span,
start_nack_span,
)

if typing.TYPE_CHECKING: # pragma: NO COVER
import queue
Expand Down Expand Up @@ -416,17 +419,46 @@ def modify_ack_deadline(
ack_ids_gen = (item.ack_id for item in items)
deadline_seconds_gen = (item.seconds for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

subscription_id: Optional[str] = None
project_id: Optional[str] = None

for item in items:
if item.opentelemetry_data:
if math.isclose(item.seconds, 0):
item.opentelemetry_data.add_subscribe_span_event("nack start")
if subscription_id is None:
subscription_id = item.opentelemetry_data.subscription_id
if project_id is None:
project_id = item.opentelemetry_data.project_id
else:
item.opentelemetry_data.add_subscribe_span_event("modack start")
for _ in range(total_chunks):
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
subscribe_links: List[trace.Link] = []
for ack_req in ack_reqs_dict.values():
if ack_req.opentelemetry_data:
subscribe_span: Optional[
trace.Span
] = ack_req.opentelemetry_data.subscribe_span
if (
subscribe_span
and subscribe_span.get_span_context().trace_flags.sampled
):
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
nack_span: Optional[trace.Span] = None
if subscription_id and project_id:
nack_span = start_nack_span(
subscription_id,
len(ack_reqs_dict),
project_id,
subscribe_links,
)
requests_to_retry: List[requests.ModAckRequest]
requests_completed: Optional[List[requests.ModAckRequest]] = None
if default_deadline is None:
Expand All @@ -450,6 +482,8 @@ def modify_ack_deadline(
ack_reqs_dict=ack_reqs_dict,
default_deadline=default_deadline,
)
if nack_span:
nack_span.end()
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."
Expand Down
70 changes: 54 additions & 16 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,43 +895,81 @@ def test_opentelemetry_nack(span_exporter):
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

opentelemetry_data = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
opentelemetry_data.start_subscribe_span(
data1 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data1.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
delivery_attempt=5,
)
data2 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data2.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id2",
delivery_attempt=5,
)

items = [
requests.NackRequest(
ack_id="ack_id_string",
ack_id="ack_id",
byte_size=10,
ordering_key="",
future=None,
opentelemetry_data=opentelemetry_data,
)
opentelemetry_data=data1,
),
requests.NackRequest(
ack_id="ack_id2",
byte_size=10,
ordering_key="",
future=None,
opentelemetry_data=data2,
),
]
response_items = [
requests.ModAckRequest(
ack_id="ack_id_string",
ack_id="ack_id",
seconds=0,
future=None,
opentelemetry_data=opentelemetry_data,
)
opentelemetry_data=data1,
),
requests.ModAckRequest(
ack_id="ack_id2",
seconds=0,
future=None,
opentelemetry_data=data2,
),
]
manager.send_unary_modack.return_value = (response_items, [])
dispatcher_.nack(items)

mock_span_context = mock.Mock(spec=trace.SpanContext)
mock_span_context.trace_flags.sampled = False
with mock.patch.object(
data2._subscribe_span, "get_span_context", return_value=mock_span_context
):
dispatcher_.nack(items)

spans = span_exporter.get_finished_spans()

assert len(spans) == 1
subscribe_span = spans[0]
assert "messaging.gcp_pubsub.result" in subscribe_span.attributes
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "nacked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "nack start"
assert subscribe_span.events[1].name == "nack end"
assert len(spans) == 3
nack_span = spans[0]
for subscribe_span in spans[1:]:
assert "messaging.gcp_pubsub.result" in subscribe_span.attributes
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "nacked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "nack start"
assert subscribe_span.events[1].name == "nack end"

assert nack_span.name == "subscriptionID nack"
assert nack_span.kind == trace.SpanKind.CLIENT
assert nack_span.parent is None
assert len(nack_span.links) == 1
assert nack_span.attributes["messaging.system"] == "gcp_pubsub"
assert nack_span.attributes["messaging.batch.message_count"] == 2
assert nack_span.attributes["messaging.operation"] == "nack"
assert nack_span.attributes["gcp.project_id"] == "projectID"
assert nack_span.attributes["messaging.destination.name"] == "subscriptionID"
assert nack_span.attributes["code.function"] == "modify_ack_deadline"


def test_nack():
Expand Down

0 comments on commit 65aae0f

Please sign in to comment.