Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regressions in #4651 #4719

Merged
merged 5 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cancel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Cancel

on:
workflow_run:
workflows: ["Tests"]
workflows: [Tests]
types:
- requested

Expand Down
11 changes: 9 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ def metrics(self):
@property
def memory(self) -> MemoryState:
return MemoryState(
process=self._metrics["memory"],
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run
process=self._metrics["memory"] or 0,
managed=self._nbytes,
managed_spilled=self._metrics["spilled_nbytes"],
unmanaged_old=self._memory_unmanaged_old,
Expand Down Expand Up @@ -3833,7 +3835,12 @@ def heartbeat_worker(
if size == memory_unmanaged_old:
memory_unmanaged_old = 0 # recalculate min()

size = max(0, metrics["memory"] - ws._nbytes + ws._metrics["spilled_nbytes"])
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run.
# ws._nbytes is updated at a different time and sizeof() may not be accurate,
# so size may be (temporarily) negative; floor it to zero.
size = max(0, (metrics["memory"] or 0) - ws._nbytes + metrics["spilled_nbytes"])

Copy link
Collaborator Author

@crusaderky crusaderky Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see a way to write a unit test for this short of monkey-patching SystemMonitor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's going to be the only way, another test does exactly that:

@pytest.mark.asyncio
@pytest.mark.parametrize("reconnect", [True, False])
async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect):
with captured_logger("distributed.worker", level=logging.WARNING) as logger:
async with await Scheduler() as s:
def bad_heartbeat_worker(*args, **kwargs):
raise CommClosedError()
async with await Worker(s.address, reconnect=reconnect) as w:
# Trigger CommClosedError during worker heartbeat
monkeypatch.setattr(
w.scheduler, "heartbeat_worker", bad_heartbeat_worker
)
await w.heartbeat()
if reconnect:
assert w.status == Status.running
else:
assert w.status == Status.closed
assert "Heartbeat to scheduler failed" in logger.getvalue()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is distributed.admin.system-monitor.interval which controls how often the monitor runs. You could set it to incredibly high values such that it is never executed during the test runtime


Another patch version w/ using monkeypatch, you could remove the PC before it is even started. Something like

@pyters.mark.asyncio
async def test_foo():
    s = Scheduler()
    s.periodic_callbacks["monitor"] = None
    w = Worker(s)
    w.periodic_callbacks["monitor"] = None
    await s
    await w
    async with Client(s):
        ....

Copy link
Collaborator Author

@crusaderky crusaderky Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting distributed.admin.system-monitor.interval to a very high value before I create the Scheduler has no effect ( I can see data arriving in the heartbeat from the SystemMonitor.update).

Setting

s.periodic_callbacks["monitor"] = None
w.periodic_callbacks["monitor"] = None

fails with

        for pc in self.periodic_callbacks.values():
>           pc.stop()
E           AttributeError: 'NoneType' object has no attribute 'stop'

this has no effect:

del s.periodic_callbacks["monitor"]
del w.periodic_callbacks["monitor"]

this has no effect:

pc = PeriodicCallback(lamba: None, 999999999)
s.periodic_callbacks["monitor"] = pc
w.periodic_callbacks["monitor"] = pc

ws._memory_other_history.append((local_now, size))
if not memory_unmanaged_old:
# The worker has just been started or the previous minimum has been expunged
Expand Down
31 changes: 23 additions & 8 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2330,6 +2330,10 @@ def assert_memory(scheduler_or_workerstate, attr: str, min_, max_, timeout=10):
sleep(0.1)


# This test is heavily influenced by hard-to-control factors such as memory management
# by the Python interpreter and the OS, so it occasionally glitches
@pytest.mark.flaky(reruns=3, reruns_delay=5)
# ~33s runtime, or distributed.memory.recent_to_old_time + 3s
@pytest.mark.slow
def test_memory():
pytest.importorskip("zict")
Expand Down Expand Up @@ -2379,34 +2383,45 @@ def test_memory():
]
sleep(2)
assert_memory(s, "managed_spilled", 1, 999)
# Wait for the spilling to finish. Note that this does not make the test take
# longer as we're waiting for recent_to_old_time anyway.
sleep(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something more direct we can probe here instead of sleeping for 10 seconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really because the unmanaged memory is very volatile, so we don't know how many keys are going to be spilled out exactly. Also, as noted it doesn't slow the test down.


# Delete spilled keys
prev = s.memory
del f1
del f2
assert_memory(s, "managed_spilled", 0, prev.managed_spilled / 2 ** 20 - 1)
assert s.memory.managed_in_memory == prev.managed_in_memory
assert_memory(s, "managed_spilled", 0, prev.managed_spilled / 2 ** 20 - 19)

# Empty the cluster, with the exception of leaked memory
del more_futs
assert_memory(s, "managed", 0, 0)

orig_unmanaged = s_m0.unmanaged / 2 ** 20
orig_old = s_m0.unmanaged_old / 2 ** 20

# Wait until 30s have passed since the spill to observe unmanaged_recent
# transition into unmanaged_old
c.run(gc.collect)
orig_unmanaged = s_m0.unmanaged / 2 ** 20
orig_old = s_m0.unmanaged_old / 2 ** 20
assert_memory(s, "unmanaged_old", orig_old + 90, orig_old + 190, timeout=40)
assert_memory(s, "unmanaged_recent", 0, 90, timeout=40)
assert_memory(
s,
"unmanaged_old",
orig_old + 90,
# On MacOS, the process memory of the Python interpreter does not shrink as
# fast as on Linux/Windows
9999 if MACOS else orig_old + 190,
timeout=40,
)

# When the leaked memory is cleared, unmanaged and unmanaged_old drop
# This doesn't happen on MacOS, where the process memory of the Python
# interpreter does not shrink (or takes much longer to shrink)
# On MacOS, the process memory of the Python interpreter does not shrink as fast
# as on Linux/Windows
if not MACOS:
c.run(clear_leak)
assert_memory(s, "unmanaged", 0, orig_unmanaged + 95)
assert_memory(s, "unmanaged_old", 0, orig_old + 95)
assert_memory(s, "unmanaged_recent", 0, 95)
assert_memory(s, "unmanaged_recent", 0, 90)


@gen_cluster(client=True, worker_kwargs={"memory_limit": 0})
Expand Down