diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index ab5d5a27de77..f36a2c9147c6 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -11,8 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from typing import Generic, Hashable, List, Set, TypeVar +logger = logging.getLogger(__name__) + T = TypeVar("T", bound=Hashable) @@ -51,17 +54,27 @@ def insert(self, now: int, obj: T, then: int) -> None: then: When to return the object strictly after. """ then_key = int(then / self.bucket_size) + 1 + now_key = int(now / self.bucket_size) if self.entries: min_key = self.entries[0].end_key max_key = self.entries[-1].end_key + if min_key < now_key - 10: + # If we have ten buckets that are due and still nothing has + # called `fetch()` then we likely have a bug that is causing a + # memory leak. + logger.warning( + "Inserting into a wheel timer that hasn't been read from recently. Item: %s", + obj, + ) + if then_key <= max_key: # The max here is to protect against inserts for times in the past self.entries[max(min_key, then_key) - min_key].queue.add(obj) return - next_key = int(now / self.bucket_size) + 1 + next_key = now_key + 1 if self.entries: last_key = self.entries[-1].end_key else: