-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheap_priority_queue.py
85 lines (69 loc) · 2.76 KB
/
heap_priority_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
'''
Based on heap_priority_queue.py from:
Data Structures and Algorithms in Python
Michael T. Goodrich, Roberto Tamassia, and Michael H. Goldwasser
John Wiley & Sons, 2013
'''
from priority_queue_base import PriorityQueueBase
from exceptions import Empty
class HeapPriorityQueue(PriorityQueueBase): # base class defines _Item
"""A min-oriented priority queue implemented with a binary heap."""
#------------------------------ nonpublic behaviors ------------------------------
def _parent(self, j):
return (j-1) // 2
def _left(self, j):
return 2*j + 1
def _right(self, j):
return 2*j + 2
def _has_left(self, j):
return self._left(j) < len(self._data) # index beyond end of list?
def _has_right(self, j):
return self._right(j) < len(self._data) # index beyond end of list?
def _swap(self, i, j):
"""Swap the elements at indices i and j of array."""
self._data[i], self._data[j] = self._data[j], self._data[i]
def _upheap(self, j):
parent = self._parent(j)
if j > 0 and self._data[j] < self._data[parent]:
self._swap(j, parent)
self._upheap(parent) # recur at position of parent
def _downheap(self, j):
if self._has_left(j):
left = self._left(j)
small_child = left # although right may be smaller
if self._has_right(j):
right = self._right(j)
if self._data[right] < self._data[left]:
small_child = right
if self._data[small_child] < self._data[j]:
self._swap(j, small_child)
self._downheap(small_child) # recur at position of small child
#------------------------------ public behaviors ------------------------------
def __init__(self):
"""Create a new empty Priority Queue."""
self._data = []
def __len__(self):
"""Return the number of items in the priority queue."""
return len(self._data)
def add(self, key, value):
"""Add a key-value pair to the priority queue."""
self._data.append(self._Item(key, value))
self._upheap(len(self._data) - 1) # upheap newly added position
def min(self):
"""Return but do not remove (k,v) tuple with minimum key.
Raise Empty exception if empty.
"""
if self.is_empty():
raise Empty('Priority queue is empty.')
item = self._data[0]
return (item._key, item._value)
def remove_min(self):
"""Remove and return (k,v) tuple with minimum key.
Raise Empty exception if empty.
"""
if self.is_empty():
raise Empty('Priority queue is empty.')
self._swap(0, len(self._data) - 1) # put minimum item at the end
item = self._data.pop() # and remove it from the list;
self._downheap(0) # then fix new root
return (item._key, item._value)