Skip to content

Commit

Permalink
Remove update from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Oct 29, 2024
1 parent cc571c4 commit 086c229
Showing 1 changed file with 9 additions and 16 deletions.
25 changes: 9 additions & 16 deletions src/databricks/labs/ucx/assessment/sequencing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import heapq
import itertools
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass, field
Expand Down Expand Up @@ -94,30 +95,29 @@ class PriorityQueue:
"""

_REMOVED = "<removed>" # Mark removed items
_UPDATED = "<updated>" # Mark updated items

def __init__(self):
self._entries: list[QueueEntry] = []
self._entry_finder: dict[MigrationNode, QueueEntry] = {}
self._counter = 0 # Tiebreaker with equal priorities, then "first in, first out"
self._counter = itertools.count() # Tiebreaker with equal priorities, then "first in, first out"

def put(self, priority: int, task: MigrationNode) -> None:
"""Put or update task in the queue.
The lowest priority is retrieved from the queue first.
"""
if task in self._entry_finder:
raise KeyError(f"Use `:meth:update` to update existing task: {task}")
entry: QueueEntry = [priority, self._counter, task]
self._remove(task)
count = next(self._counter)
entry = [priority, count, task]
self._entry_finder[task] = entry
heapq.heappush(self._entries, entry)
self._counter += 1

def get(self) -> MigrationNode | None:
"""Gets the tasks with lowest priority."""
while self._entries:
_, _, task = heapq.heappop(self._entries)
if task in (self._REMOVED, self._UPDATED):
if task == self._REMOVED:
continue
assert isinstance(task, MigrationNode)
self._remove(task)
Expand All @@ -130,15 +130,6 @@ def _remove(self, task: MigrationNode) -> None:
entry = self._entry_finder.pop(task)
entry[2] = self._REMOVED

def update(self, priority: int, task: MigrationNode) -> None:
"""Update a task in the queue."""
entry = self._entry_finder.pop(task)
if entry is None:
raise KeyError(f"Cannot update unknown task: {task}")
if entry[2] != self._REMOVED: # Do not update REMOVED tasks
entry[2] = self._UPDATED
self.put(priority, task)


class MigrationSequencer:
"""Sequence the migration dependencies in order to execute the migration.
Expand Down Expand Up @@ -244,8 +235,10 @@ def generate_steps(self) -> Iterable[MigrationStep]:
seen.add(node)
# Update the queue priority as if the migration step was completed
for dependency in self._outgoing[node.key]:
if dependency in seen:
continue
priority = len(incoming[dependency.key] - seen)
queue.update(priority, dependency)
queue.put(priority, dependency)
node = queue.get()
return ordered_steps

Expand Down

0 comments on commit 086c229

Please sign in to comment.