Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update black to 22.3.0 #889

Merged
merged 1 commit into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
hooks:
- id: isort
- repo: https://github.com/ambv/black
rev: 19.10b0
rev: 22.3.0
hooks:
- id: black
- repo: https://gitlab.com/pycqa/flake8
Expand Down
6 changes: 4 additions & 2 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column_tasks = get_rearrange_by_column_tasks_wrapper(
dask.dataframe.shuffle.rearrange_by_column_tasks
dask.dataframe.shuffle.rearrange_by_column_tasks = (
get_rearrange_by_column_tasks_wrapper(
dask.dataframe.shuffle.rearrange_by_column_tasks
)
)


Expand Down
34 changes: 27 additions & 7 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu)
import numpy as xp
import pandas as xdf

xp.random.seed(2 ** 32 - 1)
xp.random.seed(2**32 - 1)

chunk_type = chunk_type or "build"
frac_match = frac_match or 1.0
Expand Down Expand Up @@ -258,7 +258,10 @@ def main(args):
for (w1, w2), v in bandwidths.items()
}
total_nbytes = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb))
(
scheduler_workers[w1].name,
scheduler_workers[w2].name,
): format_bytes(sum(nb))
for (w1, w2), nb in total_nbytes.items()
}

Expand Down Expand Up @@ -379,21 +382,30 @@ def main(args):
def parse_args():
special_args = [
{
"name": ["-b", "--backend",],
"name": [
"-b",
"--backend",
],
"choices": ["dask", "explicit-comms"],
"default": "dask",
"type": str,
"help": "The backend to use.",
},
{
"name": ["-t", "--type",],
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Do merge with GPU or CPU dataframes",
},
{
"name": ["-c", "--chunk-size",],
"name": [
"-c",
"--chunk-size",
],
"default": 1_000_000,
"metavar": "n",
"type": int,
Expand Down Expand Up @@ -444,9 +456,17 @@ def parse_args():
"action": "store_true",
"help": "Write output as markdown",
},
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
{
"name": ["-s", "--set-index",],
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
{
"name": [
"-s",
"--set-index",
],
"action": "store_true",
"help": "Call set_index on the key column to sort the joined dataframe.",
},
Expand Down
22 changes: 18 additions & 4 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def main(args):
for (w1, w2), v in bandwidths.items()
}
total_nbytes = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb))
(
scheduler_workers[w1].name,
scheduler_workers[w2].name,
): format_bytes(sum(nb))
for (w1, w2), nb in total_nbytes.items()
}

Expand Down Expand Up @@ -251,14 +254,20 @@ def parse_args():
"help": "Number of input partitions (default '100')",
},
{
"name": ["-b", "--backend",],
"name": [
"-b",
"--backend",
],
"choices": ["dask", "explicit-comms"],
"default": "dask",
"type": str,
"help": "The backend to use.",
},
{
"name": ["-t", "--type",],
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
Expand All @@ -276,7 +285,12 @@ def parse_args():
"action": "store_true",
"help": "Write output as markdown",
},
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
]

return parse_benchmark_args(
Expand Down
32 changes: 24 additions & 8 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ async def run(args):
for (w1, w2), v in bandwidths.items()
}
total_nbytes = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(
sum(nb)
)
(
scheduler_workers[w1].name,
scheduler_workers[w2].name,
): format_bytes(sum(nb))
for (w1, w2), nb in total_nbytes.items()
}

Expand Down Expand Up @@ -318,35 +319,50 @@ async def run(args):
def parse_args():
special_args = [
{
"name": ["-s", "--size",],
"name": [
"-s",
"--size",
],
"default": "10000",
"metavar": "n",
"type": int,
"help": "The array size n in n^2 (default 10000). For 'svd' operation "
"the second dimension is given by --second-size.",
},
{
"name": ["-2", "--second-size",],
"name": [
"-2",
"--second-size",
],
"default": "1000",
"type": int,
"help": "The second dimension size for 'svd' operation (default 1000).",
},
{
"name": ["-t", "--type",],
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Do merge with GPU or CPU dataframes.",
},
{
"name": ["-o", "--operation",],
"name": [
"-o",
"--operation",
],
"default": "transpose_sum",
"type": str,
"help": "The operation to run, valid options are: "
"'transpose_sum' (default), 'dot', 'fft', 'svd', 'sum', 'mean', 'slice'.",
},
{
"name": ["-c", "--chunk-size",],
"name": [
"-c",
"--chunk-size",
],
"default": "2500",
"type": int,
"help": "Chunk size (default 2500).",
Expand Down
34 changes: 26 additions & 8 deletions dask_cuda/benchmarks/local_cupy_map_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ async def run(args):
for (w1, w2), v in bandwidths.items()
}
total_nbytes = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(
sum(nb)
)
(
scheduler_workers[w1].name,
scheduler_workers[w2].name,
): format_bytes(sum(nb))
for (w1, w2), nb in total_nbytes.items()
}

Expand Down Expand Up @@ -198,28 +199,40 @@ async def run(args):
def parse_args():
special_args = [
{
"name": ["-s", "--size",],
"name": [
"-s",
"--size",
],
"default": "10000",
"metavar": "n",
"type": int,
"help": "The size n in n^2 (default 10000)",
},
{
"name": ["-t", "--type",],
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Use GPU or CPU arrays",
},
{
"name": ["-c", "--chunk-size",],
"name": [
"-c",
"--chunk-size",
],
"default": "128 MiB",
"metavar": "nbytes",
"type": str,
"help": "Chunk size (default '128 MiB')",
},
{
"name": ["-k", "--kernel-size",],
"name": [
"-k",
"--kernel-size",
],
"default": "1",
"metavar": "k",
"type": int,
Expand All @@ -232,7 +245,12 @@ def parse_args():
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
]

return parse_benchmark_args(
Expand Down
9 changes: 7 additions & 2 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
"Logging is only enabled if RMM memory pool is enabled.",
)
parser.add_argument(
"--all-to-all", action="store_true", help="Run all-to-all before computation",
"--all-to-all",
action="store_true",
help="Run all-to-all before computation",
)
parser.add_argument(
"--enable-tcp-over-ucx",
Expand Down Expand Up @@ -241,7 +243,10 @@ def get_scheduler_workers(dask_scheduler=None):


def setup_memory_pool(
dask_worker=None, pool_size=None, disable_pool=False, log_directory=None,
dask_worker=None,
pool_size=None,
disable_pool=False,
log_directory=None,
):
import cupy

Expand Down
9 changes: 7 additions & 2 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@
allows querying the amount of memory allocated by RMM.""",
)
@click.option(
"--pid-file", type=str, default="", help="File to write the process PID.",
"--pid-file",
type=str,
default="",
help="File to write the process PID.",
)
@click.option(
"--resources",
Expand Down Expand Up @@ -314,7 +317,9 @@ def main(
):
if tls_ca_file and tls_cert and tls_key:
security = Security(
tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key,
tls_ca_file=tls_ca_file,
tls_worker_cert=tls_cert,
tls_worker_key=tls_key,
)
else:
security = None
Expand Down
2 changes: 1 addition & 1 deletion dask_cuda/explicit_comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def server_handler(ep):


async def _create_endpoints(session_state, peers):
""" Each worker creates a UCX endpoint to all workers with greater rank"""
"""Each worker creates a UCX endpoint to all workers with greater rank"""
assert session_state["loop"] is asyncio.get_event_loop()

myrank = session_state["rank"]
Expand Down
4 changes: 3 additions & 1 deletion dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,9 @@ def new_worker_spec(self):
visible_devices = cuda_visible_devices(worker_count, self.cuda_visible_devices)
spec["options"].update(
{
"env": {"CUDA_VISIBLE_DEVICES": visible_devices,},
"env": {
"CUDA_VISIBLE_DEVICES": visible_devices,
},
"plugins": {
CPUAffinity(
get_cpu_affinity(nvml_device_index(0, visible_devices))
Expand Down
4 changes: 3 additions & 1 deletion dask_cuda/proxy_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,9 @@ def __del__(self):
pxy.manager.remove(self)

def _pxy_serialize(
self, serializers: Iterable[str], proxy_detail: ProxyDetail = None,
self,
serializers: Iterable[str],
proxy_detail: ProxyDetail = None,
) -> None:
"""Inplace serialization of the proxied object using the `serializers`

Expand Down
7 changes: 6 additions & 1 deletion dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ def test_pre_import(loop): # noqa: F811

with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
["dask-cuda-worker", "127.0.0.1:9369", "--pre-import", module,]
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--pre-import",
module,
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())
Expand Down
Loading