Skip to content

Commit

Permalink
Rewrite update_who_has
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 11, 2022
1 parent 2aef3ce commit 0a088bf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4903,7 +4903,7 @@ async def register_scheduler_plugin(self, plugin, name=None, idempotent=None):

self.add_plugin(plugin, name=name, idempotent=idempotent)

def worker_send(self, worker, msg):
def worker_send(self, worker: str, msg: dict[str, Any]) -> None:
"""Send message to worker
This also handles connection failures by adding a callback to remove
Expand Down
72 changes: 42 additions & 30 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ def handle_acquire_replicas(
if ts.state != "memory":
recommendations[ts] = "fetch"

self.update_who_has(who_has)
self.update_who_has(who_has, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

def ensure_task_exists(
Expand All @@ -1939,6 +1939,8 @@ def ensure_task_exists(
self.log.append((key, "ensure-task-exists", ts.state, stimulus_id, time()))
return ts

@fail_hard
@log_errors
def handle_compute_task(
self,
*,
Expand Down Expand Up @@ -2022,7 +2024,7 @@ def handle_compute_task(
raise RuntimeError(f"Unexpected task state encountered {ts} {stimulus_id}")

self._handle_instructions(instructions)
self.update_who_has(who_has)
self.update_who_has(who_has, stimulus_id=stimulus_id)
self.transitions(recommendations, stimulus_id=stimulus_id)

def _add_to_data_needed(self, ts: TaskState, stimulus_id: str) -> RecsInstrs:
Expand Down Expand Up @@ -3395,7 +3397,7 @@ def done_event():
who_has = await retry_operation(
self.scheduler.who_has, keys=refresh_who_has
)
self.update_who_has(who_has)
self.update_who_has(who_has, stimulus_id=stimulus_id)

@log_errors
def _readd_busy_worker(self, worker: str) -> None:
Expand All @@ -3419,7 +3421,7 @@ async def find_missing(self) -> None:
keys=[ts.key for ts in self._missing_dep_flight],
)
who_has = {k: v for k, v in who_has.items() if v}
self.update_who_has(who_has)
self.update_who_has(who_has, stimulus_id=stimulus_id)
recommendations: Recs = {}
for ts in self._missing_dep_flight:
if ts.who_has:
Expand All @@ -3432,34 +3434,44 @@ async def find_missing(self) -> None:
"find-missing"
].callback_time = self.periodic_callbacks["heartbeat"].callback_time

def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
try:
for dep, workers in who_has.items():
if not workers:
continue
@log_errors
def update_who_has(
self, who_has: dict[str, Collection[str]], *, stimulus_id: str
) -> None:
for key, workers in who_has.items():
ts = self.tasks.get(key)
if not ts:
continue
workers = set(workers)

if dep in self.tasks:
dep_ts = self.tasks[dep]
if self.address in workers and self.tasks[dep].state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
dep,
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
dep_ts.who_has.update(workers)

for worker in workers:
self.has_what[worker].add(dep)
self.data_needed_per_worker[worker].push(dep_ts)
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
if self.address in workers and ts.state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
ts,
)
workers.remove(self.address)

pdb.set_trace()
raise
if ts.who_has == workers:
continue

self.log.append(
(
key,
"update-who-has",
list(ts.who_has),
list(workers),
stimulus_id,
time(),
)
)

for worker in ts.who_has - workers:
self.has_what[worker].discard(key)

ts.who_has = workers
for worker in workers:
self.has_what[worker].add(key)

def handle_steal_request(self, key: str, stimulus_id: str) -> None:
# There may be a race condition between stealing and releasing a task.
Expand Down

0 comments on commit 0a088bf

Please sign in to comment.