Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler.py / worker.py code cleanup #4626

Merged
merged 17 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 48 additions & 59 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3604,29 +3604,27 @@ async def close_worker(self, comm=None, worker=None, safe=None):
def heartbeat_worker(
self,
comm=None,
address=None,
resolve_address=True,
now=None,
resources=None,
host_info=None,
metrics=None,
executing=None,
*,
address,
resolve_address: bool = True,
now: float = None,
resources: dict = None,
host_info: dict = None,
metrics: dict,
executing: dict = None,
Comment on lines +3607 to +3614
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I find that address and metrics dropping default values here to be more confusing than what we had before. What does it mean for these parameters to not have a default but also be after the *?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that they are mandatory keyword arguments.
Nothing in the code caters of address=None - if it arrives, it's not going to be dealt with correctly.
There was an explicit assert metrics which prevented you from omitting the parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, thanks for clarifying -- this was a bit jarring to see for the first time

):
parent: SchedulerState = cast(SchedulerState, self)
address = self.coerce_address(address, resolve_address)
address = normalize_address(address)
if address not in parent._workers:
ws: WorkerState = parent._workers.get(address)
if ws is None:
return {"status": "missing"}

host = get_address_host(address)
local_now = time()
now = now or time()
assert metrics
host_info = host_info or {}

dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh["last-seen"] = local_now

frac = 1 / len(parent._workers)
Expand All @@ -3650,26 +3648,20 @@ def heartbeat_worker(
1 - alpha
)

ws: WorkerState = parent._workers[address]

ws._last_seen = time()

ws._last_seen = local_now
if executing is not None:
ws._executing = {
parent._tasks[key]: duration for key, duration in executing.items()
}

if metrics:
ws._metrics = metrics
ws._metrics = metrics

if host_info:
dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh.update(host_info)

delay = time() - now
ws._time_delay = delay
if now:
ws._time_delay = local_now - now

if resources:
self.add_resources(worker=address, resources=resources)
Expand All @@ -3678,7 +3670,7 @@ def heartbeat_worker(

return {
"status": "OK",
"time": time(),
"time": local_now,
"heartbeat-interval": heartbeat_interval(len(parent._workers)),
}

Expand Down Expand Up @@ -3756,7 +3748,7 @@ async def add_worker(
parent._total_nthreads += nthreads
parent._aliases[name] = address

response = self.heartbeat_worker(
self.heartbeat_worker(
address=address,
resolve_address=resolve_address,
now=now,
Expand Down Expand Up @@ -5331,7 +5323,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
map(first, sorted(worker_bytes.items(), key=second, reverse=True))
)

recipients = iter(reversed(sorted_workers))
recipients = reversed(sorted_workers)
recipient = next(recipients)
msgs = [] # (sender, recipient, key)
for sender in sorted_workers[: len(workers) // 2]:
Expand All @@ -5343,19 +5335,16 @@ async def rebalance(self, comm=None, keys=None, workers=None):
)

try:
while worker_bytes[sender] > avg:
while (
worker_bytes[recipient] < avg
and worker_bytes[sender] > avg
):
while avg < worker_bytes[sender]:
while worker_bytes[recipient] < avg < worker_bytes[sender]:
ts, nb = next(sender_keys)
if ts not in tasks_by_worker[recipient]:
tasks_by_worker[recipient].add(ts)
# tasks_by_worker[sender].remove(ts)
msgs.append((sender, recipient, ts))
worker_bytes[sender] -= nb
worker_bytes[recipient] += nb
if worker_bytes[sender] > avg:
if avg < worker_bytes[sender]:
recipient = next(recipients)
except StopIteration:
break
Expand Down Expand Up @@ -5386,7 +5375,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
},
)

if not all(r["status"] == "OK" for r in result):
if any(r["status"] != "OK" for r in result):
return {
"status": "missing-data",
"keys": tuple(
Expand Down Expand Up @@ -5687,7 +5676,7 @@ async def retire_workers(
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
workers_names: list (optional)
names: list (optional)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
Expand Down Expand Up @@ -5715,30 +5704,31 @@ async def retire_workers(
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = [
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
]
if workers is None:
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if workers:
workers = await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
return workers
else:
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
except KeyError: # keys left during replicate
pass

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
}
Expand All @@ -5750,22 +5740,21 @@ async def retire_workers(
keys = set.union(*[w.has_what for w in workers])
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

other_workers = set(parent._workers_dv.values()) - workers
if keys:
if other_workers:
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
else:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers and worker_keys:
if close_workers:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we dropped worker_keys here -- is this intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if worker_keys is empty, you'll end up with await asyncio.gather(*[]) which is a noop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which btw is exactly what already happened on the immediately following block if remove:...

await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
Expand Down
Loading