Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure adaptive scaling is properly awaited and closed #4720

Merged
merged 5 commits into from
May 28, 2021

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Apr 20, 2021

  • Primarily this ensures that scale_up/down is properly awaited in case the sync method is used which is more common than the loop.add_callback.
  • It also ensures the adaptive PC is stopped properly once a cluster shuts down.
  • A bit cleanup. There has been a test which raises error logs and I had issues figuring out what it actually was doing. It also was refactored a few times since its original inception.

@@ -19,43 +20,6 @@
)


@pytest.mark.asyncio
async def test_simultaneous_scale_up_and_down(cleanup):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was added in #1608 to disallow simultaneous up-/down-scaling but the current implementation doesn't work at all since the API changed. Instead it only raises error logs

@fjetter
Copy link
Member Author

fjetter commented Apr 21, 2021

Test failure is #4651 / #4719

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fjetter! I left a few small comments, but overall the changes here look good to me

distributed/deploy/tests/test_adaptive.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved


@pytest.mark.asyncio
async def test_adaptive_stopped():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this uses an async cluster, do we need the async_wait_for + timeout or can we just assert the various attributes directly? For example, when I make the following changes locally this test still passes:

diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py
index 39284302..8cf4f381 100644
--- a/distributed/deploy/tests/test_adaptive.py
+++ b/distributed/deploy/tests/test_adaptive.py
@@ -466,16 +466,8 @@ async def test_adaptive_stopped():
     """
     async with LocalCluster(n_workers=0, asynchronous=True) as cluster:
         async with Client(cluster, asynchronous=True) as client:
-            instance = cluster.adapt(interval="10ms")
+            pc = cluster.adapt(interval="10ms").periodic_callback
+            assert pc is not None
+            assert pc.is_running() is not None

-            await async_wait_for(
-                lambda: instance.periodic_callback is not None, timeout=5
-            )
-
-            await async_wait_for(
-                lambda: instance.periodic_callback.is_running() is not None, timeout=5
-            )
-
-            pc = instance.periodic_callback
-
-    await async_wait_for(lambda: pc.is_running() is not None, timeout=5)
+    assert pc.is_running() is not None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, at the very least we'll need to have one wait in between which actually waits for the entire thing to start. If the PC was never started, the conditions are trivially true. I'll add a comment and what I can remove for it to still work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the is not None checks where useless which is why the test did not fail. I properly assert for bool now and now I need to wait

Comment on lines +418 to +421
# Need to call stop here before we close all servers to avoid having
# dangling tasks in the ioloop
with suppress(AttributeError):
self._adaptive.stop()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit unfortunate since I need to call it up here and not down in Cluster. If I do this only in cluster, the even loop seems to close too soon and we have still pending tasks from AdaptiveCore.adapt.

I'm wondering if we ever considered adding PYTHONASYNCIODEBUG=1 to our test suite which would raise in these instances. Not sure how much would break or if this is a bad idea in general

@fjetter fjetter force-pushed the cleanup/adaptive branch from bc0b646 to 0ac3f5c Compare May 26, 2021 12:15
@fjetter fjetter force-pushed the cleanup/adaptive branch from 9c6137c to 733fe63 Compare May 26, 2021 15:15
@fjetter fjetter force-pushed the cleanup/adaptive branch from 974bc3b to edfa024 Compare May 27, 2021 16:35
@fjetter fjetter merged commit 93e2869 into dask:main May 28, 2021
@fjetter fjetter deleted the cleanup/adaptive branch May 28, 2021 07:50
douglasdavis pushed a commit to douglasdavis/distributed that referenced this pull request May 28, 2021
* Ensure adaptive scaling is properly awaited and closed

* review comments

* Ensure no tasks are pending when closing adaptive cluster

* remvoe assert in stop

* break cyclic ref in adaptive core
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants