Skip to content

Commit

Permalink
fix test_memory and scheduler crash
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 20, 2021
1 parent d5fc324 commit 28852b4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cancel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Cancel

on:
workflow_run:
workflows: ["Tests"]
workflows: [Tests, test_memory_stress_test]
types:
- requested

Expand Down
93 changes: 93 additions & 0 deletions .github/workflows/test_memory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
name: test_memory_stress_test

on: [push, pull_request]

jobs:
test:
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.7", "3.8", "3.9"]

# Uncomment to stress-test the test suite for random failures
# This will take a LONG time and delay all PRs across the whole github.com/dask!
run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

steps:
- name: Checkout source
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
channels: conda-forge,defaults
channel-priority: true
python-version: ${{ matrix.python-version }}
environment-file: continuous_integration/environment-${{ matrix.python-version }}.yaml
activate-environment: dask-distributed
auto-activate-base: false

- name: Install stacktrace
shell: bash -l {0}
# stacktrace for Python 3.8 has not been released at the moment of writing
if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }}
run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace

- name: Hack around https://github.com/ipython/ipython/issues/12197
# This upstream issue causes an interpreter crash when running
# distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof
shell: bash -l {0}
if: ${{ matrix.os == 'windows-latest' && matrix.python-version == '3.9' }}
run: mamba uninstall ipython

- name: Cythonize
shell: bash -l {0}
if: ${{ matrix.python-version == '3.7' }}
run: python setup.py build_ext --with-cython

- name: Install
shell: bash -l {0}
run: python -m pip install --no-deps -e .

- name: mamba list
shell: bash -l {0}
run: mamba list

- name: mamba env export
shell: bash -l {0}
run: |
echo -e "--\n--Conda Environment (re-create this with \`mamba env create --name <name> -f <output_file>\`)\n--"
mamba env export | grep -E -v '^prefix:.*$'
- name: Setup SSH
shell: bash -l {0}
# FIXME no SSH available on Windows
# https://github.com/dask/distributed/issues/4509
if: ${{ matrix.os != 'windows-latest' }}
run: bash continuous_integration/scripts/setup_ssh.sh

- name: Test
shell: bash -l {0}
env:
PYTHONFAULTHANDLER: 1
run: |
if [[ "${{ matrix.os }}" = "ubuntu-latest" ]]; then
# FIXME ipv6-related failures on Ubuntu github actions CI
# https://github.com/dask/distributed/issues/4514
export DISABLE_IPV6=1
fi
source continuous_integration/scripts/set_ulimit.sh
pytest --runslow distributed/tests/test_scheduler.py::test_memory
# - name: Debug with tmate on failure
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3
11 changes: 9 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ def metrics(self):
@property
def memory(self) -> MemoryState:
return MemoryState(
process=self._metrics["memory"],
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run
process=self._metrics["memory"] or 0,
managed=self._nbytes,
managed_spilled=self._metrics["spilled_nbytes"],
unmanaged_old=self._memory_unmanaged_old,
Expand Down Expand Up @@ -3833,7 +3835,12 @@ def heartbeat_worker(
if size == memory_unmanaged_old:
memory_unmanaged_old = 0 # recalculate min()

size = max(0, metrics["memory"] - ws._nbytes + ws._metrics["spilled_nbytes"])
# metrics["memory"] is None if the worker sent a heartbeat before its
# SystemMonitor ever had a chance to run.
# ws._nbytes is updated at a different time and sizeof() may not be accurate,
# so size may be (temporarily) negative; floor it to zero.
size = max(0, (metrics["memory"] or 0) - ws._nbytes + metrics["spilled_nbytes"])

ws._memory_other_history.append((local_now, size))
if not memory_unmanaged_old:
# The worker has just been started or the previous minimum has been expunged
Expand Down
5 changes: 5 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2380,6 +2380,11 @@ def test_memory():
sleep(2)
assert_memory(s, "managed_spilled", 1, 999)

# Wait for more_futs to finish without moving them out of the spill file
while any(not f.done() for f in more_futs):
sleep(0.1)
sleep(5)

# Delete spilled keys
prev = s.memory
del f1
Expand Down

0 comments on commit 28852b4

Please sign in to comment.