diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py new file mode 100644 index 000000000000..033144341ced --- /dev/null +++ b/synapse/storage/schema/delta/34/received_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE received_transactions") + else: + cur.execute("DELETE FROM received_transactions") + + cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6258ff1725e5..58d4de4f1d67 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,10 +62,9 @@ def __init__(self, hs): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - hs.get_clock().looping_call( - self._persist_in_mem_txns, - 1000, - ) + self._clock.looping_call(self._persist_in_mem_txns, 1000) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -127,6 +126,7 @@ def set_received_txn_response(self, transaction_id, origin, code, "origin": origin, "response_code": code, "response_json": buffer(encode_canonical_json(response_dict)), + "ts": self._clock.time_msec(), }, or_ignore=True, desc="set_received_txn_response", @@ -383,3 +383,12 @@ def f(txn): yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn)