Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler.py / worker.py code cleanup #4626

Merged
merged 17 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 49 additions & 59 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3604,29 +3604,28 @@ async def close_worker(self, comm=None, worker=None, safe=None):
def heartbeat_worker(
self,
comm=None,
address=None,
resolve_address=True,
now=None,
resources=None,
host_info=None,
metrics=None,
executing=None,
*,
address,
resolve_address: bool = True,
now: float = None,
resources: dict = None,
host_info: dict = None,
metrics: dict,
executing: dict = None,
Comment on lines +3607 to +3614
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I find that address and metrics dropping default values here to be more confusing than what we had before. What does it mean for these parameters to not have a default but also be after the *?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that they are mandatory keyword arguments.
Nothing in the code caters of address=None - if it arrives, it's not going to be dealt with correctly.
There was an explicit assert metrics which prevented you from omitting the parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, thanks for clarifying -- this was a bit jarring to see for the first time

):
parent: SchedulerState = cast(SchedulerState, self)
address = self.coerce_address(address, resolve_address)
address = normalize_address(address)
if address not in parent._workers:
try:
ws: WorkerState = parent._workers[address]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would actually suggest using .get and checking for None instead. This has worked well in other places we have done this and avoids some overhead from Cython code generated around exception handling

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a benchmark and I can't see the issue?

Python 3.8, Cython 0.29.21, Linux x64

N = 10000


def bench_get_miss():
    d = {}
    for i in range(N):
        x = d.get(i)
        if x is None:
            x = {}
            d[i] = x


def bench_get_hit():
    d = {1: {}}
    for _ in range(N):
        x = d.get(1)
        if x is None:
            x = {}
            d[1] = x


def bench_setdefault_miss():
    d = {}
    for i in range(N):
        x = d.setdefault(i, {})


def bench_setdefault_hit():
    d = {1: {}}
    for _ in range(N):
        x = d.setdefault(1, {})

Pure python perfomance:

%timeit bench_get_miss()
766 µs ± 1.89 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit bench_get_hit()
384 µs ± 174 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit bench_setdefault_miss()
534 µs ± 269 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit bench_setdefault_hit()
431 µs ± 9.68 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Cythonized:

%timeit bench_get_miss()
524 µs ± 4.38 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit bench_get_hit()
139 µs ± 791 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit bench_setdefault_miss()
366 µs ± 1.03 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit bench_setdefault_hit()
193 µs ± 1.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakirkham any feedback on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those benchmarks don't seem to be comparing to try...except, which is what is used here and is what is being suggested to be changed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more context, please see the following

In [1]: %load_ext Cython

In [2]: %%cython -3
   ...: 
   ...: from cython import ccall
   ...: 
   ...: 
   ...: @ccall
   ...: def get_safe1(d: dict, k):
   ...:     return d.get(k)
   ...: 
   ...: 
   ...: @ccall
   ...: def get_safe2(d: dict, k):
   ...:     try:
   ...:         return d[k]
   ...:     except KeyError:
   ...:         return None
   ...: 

In [3]: d = {"a": 1}

In [4]: %time get_safe1(d, "a")
CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.2 µs
Out[4]: 1

In [5]: %time get_safe2(d, "a")
CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.2 µs
Out[5]: 1

In [6]: %time get_safe1(d, "b")
CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 6.2 µs

In [7]: %time get_safe2(d, "b")
CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 9.06 µs

IOW it is the try...except... itself that adds overhead particularly when the key is missed. This can be avoided by just using .get instead.

Additionally the is None check that would follow is a simple identity check in Python and a pointer comparison in Cython. So is very fast

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misread your comment I thought you were talking about line 3629 et al.

New benchmark shows that effectively try is slower than in and get on a miss, but it's not a cython-specific issue:

N = 100000


def bench_in_miss():
    d = {}
    x = 1
    for _ in range(N):
        if x not in d:
            pass
        else:
            y = d[x]


def bench_in_hit():
    d = {1: 2}
    x = 1
    for _ in range(N):
        if x not in d:
            pass
        else:
            y = d[x]


def bench_try_miss():
    d = {}
    x = 1
    for _ in range(N):
        try:
            y = d[x]
        except KeyError:
            pass


def bench_try_hit():
    d = {1: 2}
    x = 1
    for _ in range(N):
        try:
            y = d[x]
        except KeyError:
            pass


def bench_get_miss():
    d = {}
    x = 1
    for _ in range(N):
        y = d.get(x)
        if y is None:
            pass


def bench_get_hit():
    d = {1: 2}
    x = 1
    for _ in range(N):
        y = d.get(x)
        if y is None:
            pass

Pure Python

In [3]: %time bench_in_miss()
CPU times: user 4.37 ms, sys: 14 µs, total: 4.38 ms
Wall time: 4.38 ms

In [4]: %time bench_in_hit()
CPU times: user 6.94 ms, sys: 0 ns, total: 6.94 ms
Wall time: 6.92 ms

In [5]: %time bench_try_miss()
CPU times: user 18.1 ms, sys: 0 ns, total: 18.1 ms
Wall time: 18.1 ms

In [6]: %time bench_try_hit()
CPU times: user 5.24 ms, sys: 0 ns, total: 5.24 ms
Wall time: 5.21 ms

In [7]: %time bench_get_miss()
CPU times: user 8.11 ms, sys: 0 ns, total: 8.11 ms
Wall time: 8.09 ms

In [8]: %time bench_get_hit()
CPU times: user 7.79 ms, sys: 0 ns, total: 7.79 ms
Wall time: 7.77 ms

Cython

In [2]: %time bench_in_miss()
CPU times: user 4.22 ms, sys: 22 µs, total: 4.24 ms
Wall time: 4.25 ms

In [3]: %time bench_in_hit()
CPU times: user 4.23 ms, sys: 0 ns, total: 4.23 ms
Wall time: 4.23 ms

In [4]: %time bench_try_miss()
CPU times: user 9.27 ms, sys: 0 ns, total: 9.27 ms
Wall time: 9.33 ms

In [5]: %time bench_try_hit()
CPU times: user 5.77 ms, sys: 0 ns, total: 5.77 ms
Wall time: 5.83 ms

In [6]: %time bench_get_miss()
CPU times: user 3.31 ms, sys: 0 ns, total: 3.31 ms
Wall time: 3.31 ms

In [7]: %time bench_get_hit()
CPU times: user 4.4 ms, sys: 25 µs, total: 4.42 ms
Wall time: 4.43 ms

I'm reverting to "in" as it is the fastest all around.

except KeyError:
return {"status": "missing"}

host = get_address_host(address)
local_now = time()
now = now or time()
assert metrics
host_info = host_info or {}

dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh["last-seen"] = local_now

frac = 1 / len(parent._workers)
Expand All @@ -3650,26 +3649,20 @@ def heartbeat_worker(
1 - alpha
)

ws: WorkerState = parent._workers[address]

ws._last_seen = time()

ws._last_seen = local_now
if executing is not None:
ws._executing = {
parent._tasks[key]: duration for key, duration in executing.items()
}

if metrics:
ws._metrics = metrics
ws._metrics = metrics

if host_info:
dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh.update(host_info)

delay = time() - now
ws._time_delay = delay
if now:
ws._time_delay = local_now - now

if resources:
self.add_resources(worker=address, resources=resources)
Expand All @@ -3678,7 +3671,7 @@ def heartbeat_worker(

return {
"status": "OK",
"time": time(),
"time": local_now,
"heartbeat-interval": heartbeat_interval(len(parent._workers)),
}

Expand Down Expand Up @@ -3756,7 +3749,7 @@ async def add_worker(
parent._total_nthreads += nthreads
parent._aliases[name] = address

response = self.heartbeat_worker(
self.heartbeat_worker(
address=address,
resolve_address=resolve_address,
now=now,
Expand Down Expand Up @@ -5331,7 +5324,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
map(first, sorted(worker_bytes.items(), key=second, reverse=True))
)

recipients = iter(reversed(sorted_workers))
recipients = reversed(sorted_workers)
recipient = next(recipients)
msgs = [] # (sender, recipient, key)
for sender in sorted_workers[: len(workers) // 2]:
Expand All @@ -5343,19 +5336,16 @@ async def rebalance(self, comm=None, keys=None, workers=None):
)

try:
while worker_bytes[sender] > avg:
while (
worker_bytes[recipient] < avg
and worker_bytes[sender] > avg
):
while avg < worker_bytes[sender]:
while worker_bytes[recipient] < avg < worker_bytes[sender]:
ts, nb = next(sender_keys)
if ts not in tasks_by_worker[recipient]:
tasks_by_worker[recipient].add(ts)
# tasks_by_worker[sender].remove(ts)
msgs.append((sender, recipient, ts))
worker_bytes[sender] -= nb
worker_bytes[recipient] += nb
if worker_bytes[sender] > avg:
if avg < worker_bytes[sender]:
recipient = next(recipients)
except StopIteration:
break
Expand Down Expand Up @@ -5386,7 +5376,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
},
)

if not all(r["status"] == "OK" for r in result):
if any(r["status"] != "OK" for r in result):
return {
"status": "missing-data",
"keys": tuple(
Expand Down Expand Up @@ -5687,7 +5677,7 @@ async def retire_workers(
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
workers_names: list (optional)
names: list (optional)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
Expand Down Expand Up @@ -5715,30 +5705,31 @@ async def retire_workers(
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = [
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
]
if workers is None:
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if workers:
workers = await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
return workers
else:
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
except KeyError: # keys left during replicate
pass

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
}
Expand All @@ -5750,22 +5741,21 @@ async def retire_workers(
keys = set.union(*[w.has_what for w in workers])
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

other_workers = set(parent._workers_dv.values()) - workers
if keys:
if other_workers:
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
else:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers and worker_keys:
if close_workers:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we dropped worker_keys here -- is this intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if worker_keys is empty, you'll end up with await asyncio.gather(*[]) which is a noop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which btw is exactly what already happened on the immediately following block if remove:...

await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
Expand Down
Loading