Skip to content

Commit

Permalink
Merge branch 'main' into nanny-close-proc-on-start-failure
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 3, 2022
2 parents 28c91a5 + 4f6960a commit 88abaf2
Show file tree
Hide file tree
Showing 53 changed files with 537 additions and 208 deletions.
4 changes: 2 additions & 2 deletions distributed/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def versions_from_parentdir(parentdir_prefix, root, verbose):
"""
rootdirs = []

for i in range(3):
for _ in range(3):
dirname = os.path.basename(root)
if dirname.startswith(parentdir_prefix):
return {
Expand Down Expand Up @@ -520,7 +520,7 @@ def get_versions():
# versionfile_source is the relative path from the top of the source
# tree (where the .git directory might live) to this file. Invert
# this to find the root from __file__.
for i in cfg.versionfile_source.split("/"):
for _ in cfg.versionfile_source.split("/"):
root = os.path.dirname(root)
except NameError:
return {
Expand Down
4 changes: 2 additions & 2 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def __reduce__(self):
@property
def _io_loop(self):
if self._worker:
return self._worker.io_loop
return self._worker.loop
else:
return self._client.io_loop
return self._client.loop

@property
def _scheduler_rpc(self):
Expand Down
11 changes: 11 additions & 0 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@
required=False,
help="Deprecated. See --dashboard/--no-dashboard.",
)
@click.option(
"--jupyter/--no-jupyter",
"jupyter",
default=False,
required=False,
help="Start a Jupyter Server in the same process. Warning: This will make"
"it possible for anyone with access to your dashboard address to run"
"Python code",
)
@click.option("--show/--no-show", default=False, help="Show web UI [default: --show]")
@click.option(
"--dashboard-prefix", type=str, default="", help="Prefix for the dashboard app"
Expand Down Expand Up @@ -132,6 +141,7 @@ def main(
tls_cert,
tls_key,
dashboard_address,
jupyter,
**kwargs,
):
g0, g1, g2 = gc.get_threshold() # https://github.com/dask/distributed/issues/1653
Expand Down Expand Up @@ -195,6 +205,7 @@ async def run():
dashboard=dashboard,
dashboard_address=dashboard_address,
http_prefix=dashboard_prefix,
jupyter=jupyter,
**kwargs,
)
logger.info("-" * 47)
Expand Down
40 changes: 36 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
except ImportError:
single_key = first
from tornado import gen
from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import distributed.utils
from distributed import cluster_dump, preloading
Expand Down Expand Up @@ -763,6 +763,7 @@ class Client(SyncMethodMixin):
_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}

preloads: list[preloading.Preload]
__loop: IOLoop | None = None

def __init__(
self,
Expand Down Expand Up @@ -875,7 +876,6 @@ def __init__(

self._asynchronous = asynchronous
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.io_loop = self.loop = self._loop_runner.loop
self._connecting_to_scheduler = False

self._gather_keys = None
Expand Down Expand Up @@ -947,6 +947,38 @@ def __init__(

ReplayTaskClient(self)

@property
def io_loop(self) -> IOLoop | None:
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
)
return self.loop

@io_loop.setter
def io_loop(self, value: IOLoop) -> None:
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.loop = value

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.__loop = value

@contextmanager
def as_current(self):
"""Thread-local, Task-local context manager that causes the Client.current
Expand Down Expand Up @@ -1153,7 +1185,7 @@ async def _start(self, timeout=no_default, **kwargs):
elif self.scheduler_file is not None:
while not os.path.exists(self.scheduler_file):
await asyncio.sleep(0.01)
for i in range(10):
for _ in range(10):
try:
with open(self.scheduler_file) as f:
cfg = json.load(f)
Expand Down Expand Up @@ -1359,7 +1391,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def __del__(self):
# If the loop never got assigned, we failed early in the constructor,
# nothing to do
if hasattr(self, "loop"):
if self.__loop is not None:
self.close()

def _inc_ref(self, key):
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ async def start(self):
self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE, **self.server_args)
self.tcp_server.handle_stream = self._handle_stream
backlog = int(dask.config.get("distributed.comm.socket-backlog"))
for i in range(5):
for _ in range(5):
try:
# When shuffling data between workers, there can
# really be O(cluster size) connection requests
Expand Down
27 changes: 18 additions & 9 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def check_tls_extra(info):
)


async def get_comm_pair(listen_addr, listen_args={}, connect_args={}, **kwargs):
async def get_comm_pair(listen_addr, listen_args=None, connect_args=None, **kwargs):
listen_args = listen_args or {}
connect_args = connect_args or {}
q = asyncio.Queue()

async def handle_comm(comm):
Expand Down Expand Up @@ -337,7 +339,7 @@ async def test_comm_failure_threading(tcp):

async def sleep_for_60ms():
max_thread_count = 0
for x in range(60):
for _ in range(60):
await asyncio.sleep(0.001)
thread_count = threading.active_count()
if thread_count > max_thread_count:
Expand Down Expand Up @@ -379,7 +381,7 @@ async def handle_comm(comm):
try:
assert comm.peer_address.startswith("inproc://" + addr_head)
client_addresses.add(comm.peer_address)
for i in range(N_MSGS):
for _ in range(N_MSGS):
msg = await comm.read()
msg["op"] = "pong"
await comm.write(msg)
Expand All @@ -399,7 +401,7 @@ async def client_communicate(key, delay=0):
comm = await connect(listener.contact_address)
try:
assert comm.peer_address == "inproc://" + listener_addr
for i in range(N_MSGS):
for _ in range(N_MSGS):
await comm.write({"op": "ping", "data": key})
if delay:
await asyncio.sleep(delay)
Expand Down Expand Up @@ -489,8 +491,8 @@ async def check_client_server(
addr,
check_listen_addr=None,
check_contact_addr=None,
listen_args={},
connect_args={},
listen_args=None,
connect_args=None,
):
"""
Abstract client / server test.
Expand Down Expand Up @@ -740,7 +742,12 @@ async def handle_comm(comm):
#


async def check_comm_closed_implicit(addr, delay=None, listen_args={}, connect_args={}):
async def check_comm_closed_implicit(
addr, delay=None, listen_args=None, connect_args=None
):
listen_args = listen_args or {}
connect_args = connect_args or {}

async def handle_comm(comm):
await comm.close()

Expand Down Expand Up @@ -771,7 +778,9 @@ async def test_inproc_comm_closed_implicit():
await check_comm_closed_implicit(inproc.new_address())


async def check_comm_closed_explicit(addr, listen_args={}, connect_args={}):
async def check_comm_closed_explicit(addr, listen_args=None, connect_args=None):
listen_args = listen_args or {}
connect_args = connect_args or {}
a, b = await get_comm_pair(addr, listen_args=listen_args, connect_args=connect_args)
a_read = a.read()
b_read = b.read()
Expand Down Expand Up @@ -1020,7 +1029,7 @@ async def handle_comm(comm):
listeners = []
N = 100

for i in range(N):
for _ in range(N):
listener = await listen(addr, handle_comm)
listeners.append(listener)

Expand Down
6 changes: 4 additions & 2 deletions distributed/comm/tests/test_ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ def test_registered(ucx_loop):


async def get_comm_pair(
listen_addr="ucx://" + HOST, listen_args={}, connect_args={}, **kwargs
listen_addr="ucx://" + HOST, listen_args=None, connect_args=None, **kwargs
):
listen_args = listen_args or {}
connect_args = connect_args or {}
q = asyncio.queues.Queue()

async def handle_comm(comm):
Expand Down Expand Up @@ -300,7 +302,7 @@ async def test_stress(
x = x.persist()
await wait(x)

for i in range(10):
for _ in range(10):
x = x.rechunk((chunksize, -1))
x = x.rechunk((-1, chunksize))
x = x.persist()
Expand Down
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ def collect(self):
self.active,
len(self._connecting),
)
for addr, comms in self.available.items():
for comms in self.available.values():
for comm in comms:
IOLoop.current().add_callback(comm.close)
self.semaphore.release()
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ def __init__(self, scheduler, **kwargs):
def update(self):
agg_times = defaultdict(float)

for key, ts in self.scheduler.task_prefixes.items():
for ts in self.scheduler.task_prefixes.values():
for action, t in ts.all_durations.items():
agg_times[action] += t

Expand Down Expand Up @@ -2539,7 +2539,7 @@ def update(self):

durations = set()
nbytes = set()
for key, tg in self.scheduler.task_groups.items():
for tg in self.scheduler.task_groups.values():

if tg.duration and tg.nbytes_total:
durations.add(tg.duration)
Expand Down Expand Up @@ -3495,8 +3495,8 @@ def __init__(self, scheduler, width=800, **kwargs):
@without_property_validation
def update(self):
data = {name: [] for name in self.names + self.extra_names}
for i, (addr, ws) in enumerate(
sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name))
for i, ws in enumerate(
sorted(self.scheduler.workers.values(), key=lambda ws: str(ws.name))
):
minfo = ws.memory

Expand Down
24 changes: 22 additions & 2 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from inspect import isawaitable
from typing import Any

from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import dask.config
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
Expand Down Expand Up @@ -55,6 +55,7 @@ class Cluster(SyncMethodMixin):
"""

_supports_scaling = True
__loop: IOLoop | None = None

def __init__(
self,
Expand All @@ -65,7 +66,6 @@ def __init__(
scheduler_sync_interval=1,
):
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop

self.scheduler_info = {"workers": {}}
self.periodic_callbacks = {}
Expand All @@ -89,6 +89,26 @@ def __init__(
}
self.status = Status.created

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
if value is None:
raise ValueError("expected an IOLoop, got None")
self.__loop = value

@property
def name(self):
return self._cluster_info["name"]
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/old_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def __init__(

# Start worker nodes
self.workers = []
for i, addr in enumerate(worker_addrs):
for addr in worker_addrs:
self.add_worker(addr)

@gen.coroutine
Expand Down
Loading

0 comments on commit 88abaf2

Please sign in to comment.