Skip to content

Commit

Permalink
Better names to reflect them applying to all reqs not just ones with …
Browse files Browse the repository at this point in the history
…futures
  • Loading branch information
pradn committed Feb 28, 2022
1 parent bd1491c commit 7370116
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 95 deletions.
16 changes: 8 additions & 8 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
future_reqs_dict = {
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
future_reqs_dict=future_reqs_dict,
ack_reqs_dict=ack_reqs_dict,
)

# Remove the completed messages from lease management.
Expand Down Expand Up @@ -228,10 +228,10 @@ def _retry_acks(self, requests_to_retry):
)
time.sleep(time_to_wait)

future_reqs_dict = {req.ack_id: req for req in requests_to_retry}
ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
future_reqs_dict=future_reqs_dict,
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
Expand Down Expand Up @@ -280,7 +280,7 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
future_reqs_dict = {
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
Expand All @@ -292,7 +292,7 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
modify_deadline_seconds=list(
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
),
future_reqs_dict=future_reqs_dict,
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
Expand Down Expand Up @@ -320,11 +320,11 @@ def _retry_modacks(self, requests_to_retry):
)
time.sleep(time_to_wait)

future_reqs_dict = {req.ack_id: req for req in requests_to_retry}
ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
modify_deadline_seconds=[req.seconds for req in requests_to_retry],
future_reqs_dict=future_reqs_dict,
ack_reqs_dict=ack_reqs_dict,
)

def nack(self, items: Sequence[requests.NackRequest]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,33 @@ def _get_ack_errors(
return None


def _process_futures(
def _process_requests(
error_status: Optional["status_pb2.Status"],
future_reqs_dict: "containers.ScalarMap",
ack_reqs_dict: "containers.ScalarMap",
errors_dict: Optional["containers.ScalarMap"],
):
"""Process futures by referring to errors_dict.
The errors returned by the server in `errors_dict` are used to complete
the request futures in `future_reqs_dict` (with a success or exception) or
the request futures in `ack_reqs_dict` (with a success or exception) or
to return requests for further retries.
"""
requests_completed = []
requests_to_retry = []
for ack_id in future_reqs_dict:
for ack_id in ack_reqs_dict:
if errors_dict and ack_id in errors_dict:
exactly_once_error = errors_dict[ack_id]
if exactly_once_error.startswith("TRANSIENT_"):
requests_to_retry.append(future_reqs_dict[ack_id])
requests_to_retry.append(ack_reqs_dict[ack_id])
else:
if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
else:
exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)

future = future_reqs_dict[ack_id].future
future = ack_reqs_dict[ack_id].future
future.set_exception(exc)
requests_completed.append(future_reqs_dict[ack_id])
requests_completed.append(ack_reqs_dict[ack_id])
elif error_status:
# Only permanent errors are expected here b/c retriable errors are
# retried at the lower, GRPC level.
Expand All @@ -185,16 +185,16 @@ def _process_futures(
exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
else:
exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
future = future_reqs_dict[ack_id].future
future = ack_reqs_dict[ack_id].future
future.set_exception(exc)
requests_completed.append(future_reqs_dict[ack_id])
elif future_reqs_dict[ack_id].future:
future = future_reqs_dict[ack_id].future
requests_completed.append(ack_reqs_dict[ack_id])
elif ack_reqs_dict[ack_id].future:
future = ack_reqs_dict[ack_id].future
# success
future.set_result(AcknowledgeStatus.SUCCESS)
requests_completed.append(future_reqs_dict[ack_id])
requests_completed.append(ack_reqs_dict[ack_id])
else:
requests_completed.append(future_reqs_dict[ack_id])
requests_completed.append(ack_reqs_dict[ack_id])

return requests_completed, requests_to_retry

Expand Down Expand Up @@ -556,7 +556,7 @@ def _schedule_message_on_hold(
self._scheduler.schedule(self._callback, msg)

def send_unary_ack(
self, ack_ids, future_reqs_dict
self, ack_ids, ack_reqs_dict
) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
"""Send a request using a separate unary request instead of over the stream.
Expand All @@ -581,7 +581,7 @@ def send_unary_ack(
status = status_pb2.Status()
status.code = code_pb2.DEADLINE_EXCEEDED
# Makes sure to complete futures so they don't block forever.
_process_futures(status, future_reqs_dict, None)
_process_requests(status, ack_reqs_dict, None)
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
Expand All @@ -592,13 +592,13 @@ def send_unary_ack(
self._on_rpc_done(exc)
raise

requests_completed, requests_to_retry = _process_futures(
error_status, future_reqs_dict, ack_errors_dict
requests_completed, requests_to_retry = _process_requests(
error_status, ack_reqs_dict, ack_errors_dict
)
return requests_completed, requests_to_retry

def send_unary_modack(
self, modify_deadline_ack_ids, modify_deadline_seconds, future_reqs_dict
self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict
) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
"""Send a request using a separate unary request instead of over the stream.
Expand Down Expand Up @@ -635,7 +635,7 @@ def send_unary_modack(
status = status_pb2.Status()
status.code = code_pb2.DEADLINE_EXCEEDED
# Makes sure to complete futures so they don't block forever.
_process_futures(status, future_reqs_dict, None)
_process_requests(status, ack_reqs_dict, None)
_LOGGER.debug(
"RetryError while sending unary RPC. Waiting on a transient "
"error resolution for too long, will now trigger shutdown.",
Expand All @@ -646,8 +646,8 @@ def send_unary_modack(
self._on_rpc_done(exc)
raise

requests_completed, requests_to_retry = _process_futures(
error_status, future_reqs_dict, modack_errors_dict
requests_completed, requests_to_retry = _process_requests(
error_status, ack_reqs_dict, modack_errors_dict
)
return requests_completed, requests_to_retry

Expand Down
20 changes: 10 additions & 10 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_ack():
dispatcher_.ack(items)

manager.send_unary_ack.assert_called_once_with(
ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]}
ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]}
)

manager.leaser.remove.assert_called_once_with(items)
Expand All @@ -132,7 +132,7 @@ def test_ack_no_time():
dispatcher_.ack(items)

manager.send_unary_ack.assert_called_once_with(
ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]}
ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]}
)

manager.ack_histogram.add.assert_not_called()
Expand Down Expand Up @@ -226,13 +226,13 @@ def test_retry_acks():
manager.send_unary_ack.assert_has_calls(
[
mock.call(
ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]}
ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]}
),
mock.call(
ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]}
ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]}
),
mock.call(
ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]}
ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]}
),
]
)
Expand Down Expand Up @@ -277,17 +277,17 @@ def test_retry_modacks():
mock.call(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[20],
future_reqs_dict={"ack_id_string": items[0]},
ack_reqs_dict={"ack_id_string": items[0]},
),
mock.call(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[20],
future_reqs_dict={"ack_id_string": items[0]},
ack_reqs_dict={"ack_id_string": items[0]},
),
mock.call(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[20],
future_reqs_dict={"ack_id_string": items[0]},
ack_reqs_dict={"ack_id_string": items[0]},
),
]
)
Expand Down Expand Up @@ -359,7 +359,7 @@ def test_nack():
manager.send_unary_modack.assert_called_once_with(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[0],
future_reqs_dict={
ack_reqs_dict={
"ack_id_string": requests.ModAckRequest(
ack_id="ack_id_string", seconds=0, future=None
)
Expand All @@ -380,7 +380,7 @@ def test_modify_ack_deadline():
manager.send_unary_modack.assert_called_once_with(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[60],
future_reqs_dict={"ack_id_string": items[0]},
ack_reqs_dict={"ack_id_string": items[0]},
)


Expand Down
Loading

0 comments on commit 7370116

Please sign in to comment.