Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add 'failure_ts' column to 'destinations' table #6016

Merged
merged 3 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6016.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a 'failure_ts' column to the 'destinations' database table.
1 change: 1 addition & 0 deletions changelog.d/6025.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in calculating the federation retry backoff period.
25 changes: 25 additions & 0 deletions synapse/storage/schema/delta/56/destinations_failure_ts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Record the timestamp when a given server started failing
*/
ALTER TABLE destinations ADD failure_ts BIGINT;

/* as a rough approximation, we assume that the server started failing at
* retry_interval before the last retry
*/
UPDATE destinations SET failure_ts = retry_last_ts - retry_interval
WHERE retry_last_ts > 0;
23 changes: 16 additions & 7 deletions synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _get_destination_retry_timings(self, txn, destination):
txn,
table="destinations",
keyvalues={"destination": destination},
retcols=("destination", "retry_last_ts", "retry_interval"),
retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)

Expand All @@ -174,12 +174,15 @@ def _get_destination_retry_timings(self, txn, destination):
else:
return None

def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
def set_destination_retry_timings(
self, destination, failure_ts, retry_last_ts, retry_interval
):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
Args:
destination (str)
failure_ts (int|None) - when the server started failing (ms since epoch)
retry_last_ts (int) - time of last retry attempt in unix epoch ms
retry_interval (int) - how long until next retry in ms
"""
Expand All @@ -189,30 +192,34 @@ def set_destination_retry_timings(self, destination, retry_last_ts, retry_interv
"set_destination_retry_timings",
self._set_destination_retry_timings,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)

def _set_destination_retry_timings(
self, txn, destination, retry_last_ts, retry_interval
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):

if self.database_engine.can_native_upsert:
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.

sql = """
INSERT INTO destinations (destination, retry_last_ts, retry_interval)
VALUES (?, ?, ?)
INSERT INTO destinations (
destination, failure_ts, retry_last_ts, retry_interval
)
VALUES (?, ?, ?, ?)
ON CONFLICT (destination) DO UPDATE SET
failure_ts = EXCLUDED.failure_ts,
retry_last_ts = EXCLUDED.retry_last_ts,
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

txn.execute(sql, (destination, retry_last_ts, retry_interval))
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

return

Expand All @@ -225,7 +232,7 @@ def _set_destination_retry_timings(
txn,
table="destinations",
keyvalues={"destination": destination},
retcols=("retry_last_ts", "retry_interval"),
retcols=("failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)

Expand All @@ -235,6 +242,7 @@ def _set_destination_retry_timings(
table="destinations",
values={
"destination": destination,
"failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
Expand All @@ -245,6 +253,7 @@ def _set_destination_retry_timings(
"destinations",
keyvalues={"destination": destination},
updatevalues={
"failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
Expand Down
21 changes: 18 additions & 3 deletions synapse/util/retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
# We aren't ready to retry that destination.
raise
"""
failure_ts = None
retry_last_ts, retry_interval = (0, 0)

retry_timings = yield store.get_destination_retry_timings(destination)

if retry_timings:
failure_ts = retry_timings["failure_ts"]
retry_last_ts, retry_interval = (
retry_timings["retry_last_ts"],
retry_timings["retry_interval"],
Expand All @@ -108,6 +110,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
destination,
clock,
store,
failure_ts,
retry_interval,
backoff_on_failure=backoff_on_failure,
**kwargs
Expand All @@ -120,6 +123,7 @@ def __init__(
destination,
clock,
store,
failure_ts,
retry_interval,
backoff_on_404=False,
backoff_on_failure=True,
Expand All @@ -133,6 +137,8 @@ def __init__(
destination (str)
clock (Clock)
store (DataStore)
failure_ts (int|None): when this destination started failing (in ms since
the epoch), or zero if the last request was successful
retry_interval (int): The next retry interval taken from the
database in milliseconds, or zero if the last request was
successful.
Expand All @@ -145,6 +151,7 @@ def __init__(
self.store = store
self.destination = destination

self.failure_ts = failure_ts
self.retry_interval = retry_interval
self.backoff_on_404 = backoff_on_404
self.backoff_on_failure = backoff_on_failure
Expand Down Expand Up @@ -186,15 +193,17 @@ def __exit__(self, exc_type, exc_val, exc_tb):
logger.debug(
"Connection to %s was successful; clearing backoff", self.destination
)
self.failure_ts = None
retry_last_ts = 0
self.retry_interval = 0
elif not self.backoff_on_failure:
return
else:
# We couldn't connect.
if self.retry_interval:
self.retry_interval *= RETRY_MULTIPLIER
self.retry_interval *= int(random.uniform(0.8, 1.4))
self.retry_interval = int(
self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4)
)

if self.retry_interval >= MAX_RETRY_INTERVAL:
self.retry_interval = MAX_RETRY_INTERVAL
Expand All @@ -210,11 +219,17 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
retry_last_ts = int(self.clock.time_msec())

if self.failure_ts is None:
self.failure_ts = retry_last_ts

@defer.inlineCallbacks
def store_retry_timings():
try:
yield self.store.set_destination_retry_timings(
self.destination, retry_last_ts, self.retry_interval
self.destination,
self.failure_ts,
retry_last_ts,
self.retry_interval,
)
except Exception:
logger.exception("Failed to store destination_retry_timings")
Expand Down
7 changes: 6 additions & 1 deletion tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ def prepare(self, reactor, clock, hs):
self.event_source = hs.get_event_sources().sources["typing"]

self.datastore = hs.get_datastore()
retry_timings_res = {"destination": "", "retry_last_ts": 0, "retry_interval": 0}
retry_timings_res = {
"destination": "",
"retry_last_ts": 0,
"retry_interval": 0,
"failure_ts": None,
}
self.datastore.get_destination_retry_timings.return_value = defer.succeed(
retry_timings_res
)
Expand Down
8 changes: 5 additions & 3 deletions tests/storage/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ def test_get_set_transactions(self):
r = self.get_success(d)
self.assertIsNone(r)

d = self.store.set_destination_retry_timings("example.com", 50, 100)
d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100)
self.get_success(d)

d = self.store.get_destination_retry_timings("example.com")
r = self.get_success(d)

self.assert_dict({"retry_last_ts": 50, "retry_interval": 100}, r)
self.assert_dict(
{"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r
)

def test_initial_set_transactions(self):
"""Tests that we can successfully set the destination retries (there
was a bug around invalidating the cache that broke this)
"""
d = self.store.set_destination_retry_timings("example.com", 50, 100)
d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100)
self.get_success(d)
127 changes: 127 additions & 0 deletions tests/util/test_retryutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.retryutils import (
MIN_RETRY_INTERVAL,
RETRY_MULTIPLIER,
NotRetryingDestination,
get_retry_limiter,
)

from tests.unittest import HomeserverTestCase


class RetryLimiterTestCase(HomeserverTestCase):
def test_new_destination(self):
"""A happy-path case with a new destination and a successful operation"""
store = self.hs.get_datastore()
d = get_retry_limiter("test_dest", self.clock, store)
self.pump()
limiter = self.successResultOf(d)

# advance the clock a bit before making the request
self.pump(1)

with limiter:
pass

d = store.get_destination_retry_timings("test_dest")
self.pump()
new_timings = self.successResultOf(d)
self.assertIsNone(new_timings)

def test_limiter(self):
"""General test case which walks through the process of a failing request"""
store = self.hs.get_datastore()

d = get_retry_limiter("test_dest", self.clock, store)
self.pump()
limiter = self.successResultOf(d)

self.pump(1)
try:
with limiter:
self.pump(1)
failure_ts = self.clock.time_msec()
raise AssertionError("argh")
except AssertionError:
pass

# wait for the update to land
self.pump()

d = store.get_destination_retry_timings("test_dest")
self.pump()
new_timings = self.successResultOf(d)
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], failure_ts)
self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL)

# now if we try again we should get a failure
d = get_retry_limiter("test_dest", self.clock, store)
self.pump()
self.failureResultOf(d, NotRetryingDestination)

#
# advance the clock and try again
#

self.pump(MIN_RETRY_INTERVAL)
d = get_retry_limiter("test_dest", self.clock, store)
self.pump()
limiter = self.successResultOf(d)

self.pump(1)
try:
with limiter:
self.pump(1)
retry_ts = self.clock.time_msec()
raise AssertionError("argh")
except AssertionError:
pass

# wait for the update to land
self.pump()

d = store.get_destination_retry_timings("test_dest")
self.pump()
new_timings = self.successResultOf(d)
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], retry_ts)
self.assertGreaterEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
)
self.assertLessEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
)

#
# one more go, with success
#
self.pump(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0)
d = get_retry_limiter("test_dest", self.clock, store)
self.pump()
limiter = self.successResultOf(d)

self.pump(1)
with limiter:
self.pump(1)

# wait for the update to land
self.pump()

d = store.get_destination_retry_timings("test_dest")
self.pump()
new_timings = self.successResultOf(d)
self.assertIsNone(new_timings)