Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use a set for WheelTimer to better handle duplicates.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed May 6, 2022
1 parent 4ffe7a9 commit bae9665
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions synapse/util/wheel_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generic, List, TypeVar
from typing import Generic, Hashable, List, Set, TypeVar

T = TypeVar("T")
T = TypeVar("T", bound=Hashable)


class _Entry(Generic[T]):
__slots__ = ["end_key", "queue"]

def __init__(self, end_key: int) -> None:
self.end_key: int = end_key
self.queue: List[T] = []

# We use a set here as otherwise we can end up with a lot of duplicate
# entries.
self.queue: Set[T] = set()


class WheelTimer(Generic[T]):
Expand Down Expand Up @@ -55,7 +58,7 @@ def insert(self, now: int, obj: T, then: int) -> None:

if then_key <= max_key:
# The max here is to protect against inserts for times in the past
self.entries[max(min_key, then_key) - min_key].queue.append(obj)
self.entries[max(min_key, then_key) - min_key].queue.add(obj)
return

next_key = int(now / self.bucket_size) + 1
Expand All @@ -71,7 +74,7 @@ def insert(self, now: int, obj: T, then: int) -> None:
# to insert. This ensures there are no gaps.
self.entries.extend(_Entry(key) for key in range(last_key, then_key + 1))

self.entries[-1].queue.append(obj)
self.entries[-1].queue.add(obj)

def fetch(self, now: int) -> List[T]:
"""Fetch any objects that have timed out
Expand All @@ -84,7 +87,7 @@ def fetch(self, now: int) -> List[T]:
"""
now_key = int(now / self.bucket_size)

ret = []
ret: List[T] = []
while self.entries and self.entries[0].end_key <= now_key:
ret.extend(self.entries.pop(0).queue)

Expand Down

0 comments on commit bae9665

Please sign in to comment.