Skip to content

Commit

Permalink
Fix flaky Executor test
Browse files Browse the repository at this point in the history
The underlying thread was not stopped, so whether the test passed was dependent
on whether the test finished before the thread was scheduled.  Fix by turning
off the thread and manually doing what queue item management.
  • Loading branch information
khk-globus committed Nov 4, 2024
1 parent 86a1002 commit a0da695
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions compute_sdk/tests/unit/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,27 +319,37 @@ def test_executor_shutdown_cancel_futures(cancel_futures: bool, gce: Executor):

gcc.register_function.return_value = str(uuid.uuid4())

# For this test, shutdown the thread that the fixture has already started (because
# we are about to mock it and manually effect it below; see comment in
# clear_queue())
gce._tasks_to_send.put((None, None))
try_assert(lambda: not gce._task_submitter.is_alive(), "Test setup")

gce._task_submitter = mock.Mock(spec=threading.Thread)
gce._task_submitter.join.side_effect = lambda: None
gce._task_submitter.is_alive.return_value = True

futures: list[ComputeFuture] = []
num_submits = random.randint(1, 20)
for _ in range(num_submits):
future = gce.submit(noop)
futures.append(future)
futures = [gce.submit(noop) for _ in range(num_submits)]

assert gce._tasks_to_send.qsize() == num_submits, "Verify test setup"

gce.shutdown(cancel_futures=cancel_futures)
def clear_queue():
# simulate a _task_submitter thread pulling items off the queue; necessary
# because we *stopped* the thread above (which itself is necessary to avoid
# a race condition -- add a sleep() call after the `futures = [...]` line
# to see it)
while gce._tasks_to_send.qsize():
gce._tasks_to_send.get()
return True

with mock.patch.object(gce._tasks_to_send, "empty") as mock_fn_empty:
mock_fn_empty.side_effect = clear_queue # only called if `not cancel_futures`
gce.shutdown(cancel_futures=cancel_futures)
gce._task_submitter.is_alive.return_value = False # Allow test cleanup

assert gce._tasks_to_send.qsize() == 1 # Only poison pill remains
if cancel_futures:
assert all(fut.cancelled() for fut in futures)
else:
assert all(not fut.cancelled() for fut in futures)

gce._task_submitter.is_alive.return_value = False # Allow test cleanup
assert all(f.cancelled() is cancel_futures for f in futures)


@pytest.mark.parametrize("num_submits", (random.randint(1, 1000),))
Expand Down

0 comments on commit a0da695

Please sign in to comment.