Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
push federation retry limiter down to matrixfederationclient
Browse files Browse the repository at this point in the history
rather than having to instrument everywhere we make a federation call,
make the MatrixFederationHttpClient manage the retry limiter.
  • Loading branch information
richvdh committed Mar 23, 2017
1 parent ad8a26e commit 4bd597d
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 287 deletions.
39 changes: 16 additions & 23 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from synapse.crypto.keyclient import fetch_server_key
from synapse.api.errors import SynapseError, Codes
from synapse.util.retryutils import get_retry_limiter
from synapse.util import unwrapFirstError
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import (
Expand Down Expand Up @@ -363,30 +362,24 @@ def get_key(perspective_name, perspective_keys):
def get_keys_from_server(self, server_name_and_key_ids):
@defer.inlineCallbacks
def get_key(server_name, key_ids):
limiter = yield get_retry_limiter(
server_name,
self.clock,
self.store,
)
with limiter:
keys = None
try:
keys = yield self.get_server_verify_key_v2_direct(
server_name, key_ids
)
except Exception as e:
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
)
keys = None
try:
keys = yield self.get_server_verify_key_v2_direct(
server_name, key_ids
)
except Exception as e:
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
)

if not keys:
keys = yield self.get_server_verify_key_v1_direct(
server_name, key_ids
)
if not keys:
keys = yield self.get_server_verify_key_v1_direct(
server_name, key_ids
)

keys = {server_name: keys}
keys = {server_name: keys}

defer.returnValue(keys)

Expand Down
33 changes: 13 additions & 20 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from synapse.events import FrozenEvent, builder
import synapse.metrics

from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
from synapse.util.retryutils import NotRetryingDestination

import copy
import itertools
Expand Down Expand Up @@ -234,31 +234,24 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
continue

try:
limiter = yield get_retry_limiter(
destination,
self._clock,
self.store,
transaction_data = yield self.transport_layer.get_event(
destination, event_id, timeout=timeout,
)

with limiter:
transaction_data = yield self.transport_layer.get_event(
destination, event_id, timeout=timeout,
)

logger.debug("transaction_data %r", transaction_data)
logger.debug("transaction_data %r", transaction_data)

pdu_list = [
self.event_from_pdu_json(p, outlier=outlier)
for p in transaction_data["pdus"]
]
pdu_list = [
self.event_from_pdu_json(p, outlier=outlier)
for p in transaction_data["pdus"]
]

if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]

# Check signatures are correct.
signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
# Check signatures are correct.
signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]

break
break

pdu_attempts[destination] = now

Expand Down
216 changes: 95 additions & 121 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime

from twisted.internet import defer

Expand All @@ -22,9 +22,7 @@
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
Expand Down Expand Up @@ -312,13 +310,6 @@ def _attempt_new_transaction(self, destination):
yield run_on_reactor()

while True:
limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
backoff_on_404=True, # If we get a 404 the other side has gone
)

device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
Expand Down Expand Up @@ -374,7 +365,6 @@ def _attempt_new_transaction(self, destination):

success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
)
if success:
# Remove the acknowledged device messages from the database
Expand All @@ -392,12 +382,24 @@ def _attempt_new_transaction(self, destination):
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination:
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet - "
"TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
except Exception as e:
logger.warn(
"TX [%s] Failed to send transaction: %s",
destination,
e,
)
for p in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
Expand Down Expand Up @@ -437,7 +439,7 @@ def _get_new_device_messages(self, destination):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, limiter):
pending_failures):

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
Expand All @@ -447,132 +449,104 @@ def _send_new_transaction(self, destination, pending_pdus, pending_edus,

success = True

try:
logger.debug("TX [%s] _attempt_new_transaction", destination)
logger.debug("TX [%s] _attempt_new_transaction", destination)

txn_id = str(self._next_txn_id)
txn_id = str(self._next_txn_id)

logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pdus),
len(edus),
len(failures)
)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pdus),
len(edus),
len(failures)
)

logger.debug("TX [%s] Persisting transaction...", destination)
logger.debug("TX [%s] Persisting transaction...", destination)

transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)

self._next_txn_id += 1
self._next_txn_id += 1

yield self.transaction_actions.prepare_to_send(transaction)
yield self.transaction_actions.prepare_to_send(transaction)

logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)

with limiter:
# Actually send the transaction

# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data

try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200

if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
# Actually send the transaction

# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data

try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200

if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
raise e
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
raise e

logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)

yield self.transaction_actions.delivered(
transaction, code, response
)
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)

logger.debug("TX [%s] Marked as delivered", destination)
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)

if code != 200:
for p in pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
success = False
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
yield self.transaction_actions.delivered(
transaction, code, response
)

success = False
logger.debug("TX [%s] Marked as delivered", destination)

if code != 200:
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)

logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
success = False

for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)

defer.returnValue(success)
Loading

0 comments on commit 4bd597d

Please sign in to comment.