Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CI happy again #8560

Merged
merged 47 commits into from
Mar 8, 2024
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
31366e1
Fix test_basic_merge use of hash_join (no longer public)
milesgranger Mar 7, 2024
f4153fc
await computation in shuffle/tests/test_merge.py::test_merge
milesgranger Mar 7, 2024
c7b39e7
xfail p2p shuffle RuntimeError in shuffle/tests/test_merge.py::test_m…
milesgranger Mar 7, 2024
f527f49
Check UserWarning in test_client.py::test_recreate_error_collection
milesgranger Mar 7, 2024
286903d
Check UserWarning in test_client.py::test_recreate_task_collection
milesgranger Mar 7, 2024
f71ea01
ignore UserWarning: dask-expr does not support DataFrameIOFunction pr…
milesgranger Mar 7, 2024
6fb111b
xfail test_steal.py::test_blocklist_shuffle_split blocked is empty
milesgranger Mar 7, 2024
e5f70f9
xfail test_scheduler.py::test_default_task_duration_splits - split_pr…
milesgranger Mar 7, 2024
5c712a0
test_merge_unknown_to_unknown: skip layer checks when dask-expr enabled
milesgranger Mar 7, 2024
6cc06a9
xfail test_dataframe_annotations when dask-expr enabled
milesgranger Mar 7, 2024
815422c
test_raise_on_complex_numbers - dask-expr branch
milesgranger Mar 7, 2024
9f25e6b
test_raise_on_sparse_data - dask-expr branch
milesgranger Mar 7, 2024
4daa85c
test_raise_on_non_string_column_name - dask-expr branch
milesgranger Mar 7, 2024
7ed678e
Don't use dd.shuffle.shuffle
hendrikmakait Mar 7, 2024
f6f5930
Don't test minimal version for dask-expr in P2P
hendrikmakait Mar 7, 2024
eb53213
No worker restrictions
hendrikmakait Mar 7, 2024
68d7edf
Adjust unpack prefix
hendrikmakait Mar 7, 2024
8c908a2
Restrictions are not implemented
hendrikmakait Mar 7, 2024
6025a64
Remove conditional skip
hendrikmakait Mar 7, 2024
967696c
Skip test_basic
hendrikmakait Mar 7, 2024
a1f5826
Fix test_metrics prefix name
hendrikmakait Mar 7, 2024
df07359
Remove xfail on UserWarnings
milesgranger Mar 7, 2024
d9dde2d
Avoid object to string conversion
hendrikmakait Mar 7, 2024
e748ed6
fix
hendrikmakait Mar 7, 2024
3c4e9e8
dask-contrib/dask-expr#951
hendrikmakait Mar 7, 2024
e4dfd00
Fix mindeps
hendrikmakait Mar 7, 2024
a64dfdb
xfail test_futures_of_sorted until dask-contrib/dask-expr#952
milesgranger Mar 7, 2024
302b8bb
Check UserWarning in test_highlevelgraph.py::test_dataframe_annotations
milesgranger Mar 7, 2024
a67ac1d
Check HashJoinP2P for test_basic_merge
milesgranger Mar 7, 2024
a165f16
Conditional await in test_merge.py::test_merge
milesgranger Mar 7, 2024
3f93b93
xfail test_dataframes
milesgranger Mar 8, 2024
3d6471f
xfail test_dataframe_set_index_sync - passes fine locally...
milesgranger Mar 8, 2024
cadfe01
Fix test_dataframes
hendrikmakait Mar 8, 2024
aff2a8c
test_recreate_error_collection on mindeps-pandas
crusaderky Mar 8, 2024
8dbb385
test_recreate_task_collection on mindeps-pandas
crusaderky Mar 8, 2024
32062fa
Review test_merge.py
crusaderky Mar 8, 2024
78a773f
Review test_futures_of_sorted
crusaderky Mar 8, 2024
3e9545d
Revert test_dataframe_set_index_sync
crusaderky Mar 8, 2024
9d90f90
Cosmetic
crusaderky Mar 8, 2024
8ea35f0
Use dask.bag for test_futures_of_sorted
crusaderky Mar 8, 2024
3240987
Revert test_broken_comm
crusaderky Mar 8, 2024
f5abd39
Review test_crashed_worker_after_shuffle
crusaderky Mar 8, 2024
9ab8ca6
test_crashed_worker_after_shuffle without Nanny
crusaderky Mar 8, 2024
e974509
Merge branch 'main' into milesgranger/fix-ci
hendrikmakait Mar 8, 2024
4e5c4d4
Revert changes about default durations
crusaderky Mar 8, 2024
a2bdc78
Merge remote-tracking branch 'milesgranger/milesgranger/fix-ci' into …
crusaderky Mar 8, 2024
1ccfad6
Revert nanny changes in test_crashed_worker_after_shuffle
crusaderky Mar 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 30 additions & 36 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1340,50 +1340,44 @@ async def test_repeat_shuffle_operation(c, s, a, b, wait_until_forgotten):
await check_scheduler_cleanup(s)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_crashed_worker_after_shuffle(c, s, a):
@gen_cluster(client=True)
async def test_crashed_worker_after_shuffle(c, s, a, b):
def block(df, in_event, block_event):
in_event.set()
block_event.wait()
return df

async with Nanny(s.address, nthreads=1) as n:
df = df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-03-01",
dtypes={"x": float, "y": float},
freq="100 s",
seed=42,
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = c.compute(out)

in_event = Event()
block_event = Event()

out = c.submit(
block,
out,
in_event,
block_event,
workers=[n.worker_address],
allow_other_workers=True,
)
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-03-01",
dtypes={"x": float, "y": float},
freq="100 s",
seed=42,
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = df.shuffle("x")
out = c.compute(out)

await wait_until_worker_has_tasks(UNPACK_PREFIX, n.worker_address, 1, s)
await in_event.wait()
await n.process.process.kill()
await block_event.set()
in_event = Event()
block_event = Event()

out = await out
result = out.x.size
expected = await c.compute(df.x.size)
assert result == expected
out = c.submit(
block, out, in_event, block_event, workers=[b.address], allow_other_workers=True
)

await c.close()
await check_worker_cleanup(a)
await check_scheduler_cleanup(s)
await wait_until_worker_has_tasks(UNPACK_PREFIX, b.address, 1, s)
await in_event.wait()
await s.remove_worker(b.address, stimulus_id="remove_b")
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_event.set()

out = await out
result = out.x.size
expected = await c.compute(df.x.size)
assert result == expected

await c.close()
await check_worker_cleanup(a)
await check_scheduler_cleanup(s)


@gen_cluster(client=True, nthreads=[("", 1)])
Expand Down
Loading