Skip to content

Commit

Permalink
Add logging in data plane code when processing is stuck waiting on da…
Browse files Browse the repository at this point in the history
…ta for an instruction. (#29399)
  • Loading branch information
tvalentyn authored Nov 15, 2023
1 parent 975a816 commit 427faae
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ def input_elements(
raise RuntimeError('Instruction cleaned up already %s' % instruction_id)
done_inputs = set() # type: Set[Union[str, Tuple[str, str]]]
abort_callback = abort_callback or (lambda: False)
log_interval_sec = 5 * 60
try:
start_time = time.time()
next_waiting_log_time = start_time + log_interval_sec
while len(done_inputs) < len(expected_inputs):
try:
element = received.get(timeout=1)
Expand All @@ -510,7 +513,21 @@ def input_elements(
return
if self._exception:
raise self._exception from None
current_time = time.time()
if next_waiting_log_time <= current_time:
# If at the same time another instruction is waiting on input queue
# to become available, it is a sign of inefficiency in data plane.
_LOGGER.info(
'Detected input queue delay longer than %s seconds. '
'Waiting to receive elements in input queue '
'for instruction: %s for %.2f seconds.',
log_interval_sec,
instruction_id,
current_time - start_time)
next_waiting_log_time = current_time + log_interval_sec
else:
start_time = time.time()
next_waiting_log_time = start_time + log_interval_sec
if isinstance(element, beam_fn_api_pb2.Elements.Timers):
if element.is_last:
done_inputs.add((element.transform_id, element.timer_family_id))
Expand Down

0 comments on commit 427faae

Please sign in to comment.