From 086c229125cba599fcdb9d8c875e3e9ce1b29431 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 29 Oct 2024 14:30:34 +0100 Subject: [PATCH] Remove update from queue --- .../labs/ucx/assessment/sequencing.py | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index a219bf6970..eb96217503 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -1,6 +1,7 @@ from __future__ import annotations import heapq +import itertools from collections import defaultdict from collections.abc import Iterable from dataclasses import dataclass, field @@ -94,12 +95,11 @@ class PriorityQueue: """ _REMOVED = "" # Mark removed items - _UPDATED = "" # Mark updated items def __init__(self): self._entries: list[QueueEntry] = [] self._entry_finder: dict[MigrationNode, QueueEntry] = {} - self._counter = 0 # Tiebreaker with equal priorities, then "first in, first out" + self._counter = itertools.count() # Tiebreaker with equal priorities, then "first in, first out" def put(self, priority: int, task: MigrationNode) -> None: """Put or update task in the queue. @@ -107,17 +107,17 @@ def put(self, priority: int, task: MigrationNode) -> None: The lowest priority is retrieved from the queue first. """ if task in self._entry_finder: - raise KeyError(f"Use `:meth:update` to update existing task: {task}") - entry: QueueEntry = [priority, self._counter, task] + self._remove(task) + count = next(self._counter) + entry = [priority, count, task] self._entry_finder[task] = entry heapq.heappush(self._entries, entry) - self._counter += 1 def get(self) -> MigrationNode | None: """Gets the tasks with lowest priority.""" while self._entries: _, _, task = heapq.heappop(self._entries) - if task in (self._REMOVED, self._UPDATED): + if task == self._REMOVED: continue assert isinstance(task, MigrationNode) self._remove(task) @@ -130,15 +130,6 @@ def _remove(self, task: MigrationNode) -> None: entry = self._entry_finder.pop(task) entry[2] = self._REMOVED - def update(self, priority: int, task: MigrationNode) -> None: - """Update a task in the queue.""" - entry = self._entry_finder.pop(task) - if entry is None: - raise KeyError(f"Cannot update unknown task: {task}") - if entry[2] != self._REMOVED: # Do not update REMOVED tasks - entry[2] = self._UPDATED - self.put(priority, task) - class MigrationSequencer: """Sequence the migration dependencies in order to execute the migration. @@ -244,8 +235,10 @@ def generate_steps(self) -> Iterable[MigrationStep]: seen.add(node) # Update the queue priority as if the migration step was completed for dependency in self._outgoing[node.key]: + if dependency in seen: + continue priority = len(incoming[dependency.key] - seen) - queue.update(priority, dependency) + queue.put(priority, dependency) node = queue.get() return ordered_steps