Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix broken cache for getting retry times. #343

Merged
merged 2 commits into from
Nov 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def log_failure(f):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
Expand All @@ -213,9 +214,6 @@ def _attempt_new_transaction(self, destination):
)
return

logger.debug("TX [%s] _attempt_new_transaction", destination)

# list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
Expand All @@ -228,20 +226,22 @@ def _attempt_new_transaction(self, destination):
logger.debug("TX [%s] Nothing to send", destination)
return

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[2])

pdus = [x[0] for x in pending_pdus]
edus = [x[0] for x in pending_edus]
failures = [x[0].get_dict() for x in pending_failures]
deferreds = [
x[1]
for x in pending_pdus + pending_edus + pending_failures
]

try:
self.pending_transactions[destination] = 1

logger.debug("TX [%s] _attempt_new_transaction", destination)

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[2])

pdus = [x[0] for x in pending_pdus]
edus = [x[0] for x in pending_edus]
failures = [x[0].get_dict() for x in pending_failures]
deferreds = [
x[1]
for x in pending_pdus + pending_edus + pending_failures
]

txn_id = str(self._next_txn_id)

limiter = yield get_retry_limiter(
Expand Down
48 changes: 16 additions & 32 deletions synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,6 @@ def set_destination_retry_timings(self, destination,
retry_interval (int) - how long until next retry in ms
"""

# As this is the new value, we might as well prefill the cache
self.get_destination_retry_timings.prefill(
destination,
{
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval
},
)

# XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP
return self.runInteraction(
Expand All @@ -275,31 +265,25 @@ def set_destination_retry_timings(self, destination,

def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
query = (
"UPDATE destinations"
" SET retry_last_ts = ?, retry_interval = ?"
" WHERE destination = ?"
)
txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))

txn.execute(
query,
(
retry_last_ts, retry_interval, destination,
)
self._simple_upsert_txn(
txn,
"destinations",
keyvalues={
"destination": destination,
},
values={
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
insertion_values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
)

if txn.rowcount == 0:
# destination wasn't already in table. Insert it.
self._simple_insert_txn(
txn,
table="destinations",
values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
)

def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.

Expand Down