Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #998 from matrix-org/erikj/pdu_fail_cache
Browse files Browse the repository at this point in the history
Various federation /event/ improvements
  • Loading branch information
erikjohnston authored Aug 10, 2016
2 parents d454894 + 487bc49 commit c9f724c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 21 deletions.
47 changes: 36 additions & 11 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,34 @@
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])


PDU_RETRY_TIME_MS = 1 * 60 * 1000


class FederationClient(FederationBase):
def __init__(self, hs):
super(FederationClient, self).__init__(hs)

self.pdu_destination_tried = {}
self._clock.looping_call(
self._clear_tried_cache, 60 * 1000,
)

def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()

old_dict = self.pdu_destination_tried
self.pdu_destination_tried = {}

for event_id, destination_dict in old_dict.items():
destination_dict = {
dest: time
for dest, time in destination_dict.items()
if time + PDU_RETRY_TIME_MS > now
}
if destination_dict:
self.pdu_destination_tried[event_id] = destination_dict

def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
Expand Down Expand Up @@ -240,8 +264,15 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
if ev:
defer.returnValue(ev)

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

pdu = None
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
continue

try:
limiter = yield get_retry_limiter(
destination,
Expand Down Expand Up @@ -269,25 +300,19 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):

break

except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except CodeMessageException as e:
if 400 <= e.code < 500:
raise
pdu_attempts[destination] = now

except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except NotRetryingDestination as e:
logger.info(e.message)
continue
except Exception as e:
pdu_attempts[destination] = now

logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
Expand Down Expand Up @@ -406,7 +431,7 @@ def get_events(self, destinations, room_id, event_ids, return_local=True):
events and the second is a list of event ids that we failed to fetch.
"""
if return_local:
seen_events = yield self.store.get_events(event_ids)
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
Expand Down
27 changes: 17 additions & 10 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def redact_disallowed(event, state):
if ev.type != EventTypes.Member:
continue
try:
domain = UserID.from_string(ev.state_key).domain
domain = get_domain_from_id(ev.state_key)
except:
continue

Expand Down Expand Up @@ -1093,16 +1093,17 @@ def get_persisted_pdu(self, origin, event_id, do_auth=True):
)

if event:
# FIXME: This is a temporary work around where we occasionally
# return events slightly differently than when they were
# originally signed
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
if self.hs.is_mine_id(event.event_id):
# FIXME: This is a temporary work around where we occasionally
# return events slightly differently than when they were
# originally signed
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
)

if do_auth:
in_room = yield self.auth.check_host_in_room(
Expand All @@ -1112,6 +1113,12 @@ def get_persisted_pdu(self, origin, event_id, do_auth=True):
if not in_room:
raise AuthError(403, "Host not in room.")

events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)

event = events[0]

defer.returnValue(event)
else:
defer.returnValue(None)
Expand Down

0 comments on commit c9f724c

Please sign in to comment.