-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix regressions in #4651 #4719
Fix regressions in #4651 #4719
Conversation
* code cleanup * code cleanup * Early exit pattern * refactor * cleanup * fix regression * annotations * revert * backend prototype * slightly faster both in CPython and Cython * slightly faster both in CPython and Cython * polish * polish * early exit * polish * polish * backend review * nonfunctional GUI prototype * GUI prototype (unpolished) * tooltip * refactor * GUI * GUI * GUI * refactor * polish * simpler tooltip * Reduce spilled size on delitem * tweak cluster-wide nbytes gauge * workers tab * Self-review * bokeh unit tests * test SpillBuffer * Code review * cython optimizations * test MemoryState * test backend * Remove unnecessary casts uint->sint * Self-review * Test edge cases * fix test failure * redesign test * relax maximums * fix test * lint * fix test * fix test * fix bar on small screens * height in em * larger * fix flaky test
I confirmed locally this fixes the problem for me too, thanks so much for the quick fix @crusaderky ! |
Stress test outcome:
|
# ws._nbytes is updated at a different time and sizeof() may not be accurate, | ||
# so size may be (temporarily) negative; floor it to zero. | ||
size = max(0, (metrics["memory"] or 0) - ws._nbytes + metrics["spilled_nbytes"]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see a way to write a unit test for this short of monkey-patching SystemMonitor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it's going to be the only way, another test does exactly that:
distributed/distributed/tests/test_worker.py
Lines 1697 to 1717 in e4b534a
@pytest.mark.asyncio | |
@pytest.mark.parametrize("reconnect", [True, False]) | |
async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect): | |
with captured_logger("distributed.worker", level=logging.WARNING) as logger: | |
async with await Scheduler() as s: | |
def bad_heartbeat_worker(*args, **kwargs): | |
raise CommClosedError() | |
async with await Worker(s.address, reconnect=reconnect) as w: | |
# Trigger CommClosedError during worker heartbeat | |
monkeypatch.setattr( | |
w.scheduler, "heartbeat_worker", bad_heartbeat_worker | |
) | |
await w.heartbeat() | |
if reconnect: | |
assert w.status == Status.running | |
else: | |
assert w.status == Status.closed | |
assert "Heartbeat to scheduler failed" in logger.getvalue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is distributed.admin.system-monitor.interval
which controls how often the monitor runs. You could set it to incredibly high values such that it is never executed during the test runtime
Another patch version w/ using monkeypatch, you could remove the PC before it is even started. Something like
@pyters.mark.asyncio
async def test_foo():
s = Scheduler()
s.periodic_callbacks["monitor"] = None
w = Worker(s)
w.periodic_callbacks["monitor"] = None
await s
await w
async with Client(s):
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting distributed.admin.system-monitor.interval
to a very high value before I create the Scheduler has no effect ( I can see data arriving in the heartbeat from the SystemMonitor.update).
Setting
s.periodic_callbacks["monitor"] = None
w.periodic_callbacks["monitor"] = None
fails with
for pc in self.periodic_callbacks.values():
> pc.stop()
E AttributeError: 'NoneType' object has no attribute 'stop'
this has no effect:
del s.periodic_callbacks["monitor"]
del w.periodic_callbacks["monitor"]
this has no effect:
pc = PeriodicCallback(lamba: None, 999999999)
s.periodic_callbacks["monitor"] = pc
w.periodic_callbacks["monitor"] = pc
Would people be ok with merging this even without a test and leaving a test for a different PR? This would unblock CI in Dask-CUDA and UCX-Py (potentially other RAPIDS projects too). |
I'm fine with postponing tests since our CI is also failing hard. @crusaderky your call |
I need an extra 2-3 hours to cook a unit test. I'm ok merging without |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work here @crusaderky. I'm going to merge this as is to unblock CI, but left a comment below
@@ -2379,34 +2383,45 @@ def test_memory(): | |||
] | |||
sleep(2) | |||
assert_memory(s, "managed_spilled", 1, 999) | |||
# Wait for the spilling to finish. Note that this does not make the test take | |||
# longer as we're waiting for recent_to_old_time anyway. | |||
sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something more direct we can probe here instead of sleeping for 10 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really because the unmanaged memory is very volatile, so we don't know how many keys are going to be spilled out exactly. Also, as noted it doesn't slow the test down.
Fix regressions introduced in #4651: