From 642199570c57fd37b856c3808e08fe2923001b55 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 19 Sep 2018 17:28:18 +0100 Subject: [PATCH] Improve the logging when handling a federation transaction (#3904) Let's try to rationalise the logging that happens when we are processing an incoming transaction, to make it easier to figure out what is going wrong when they take ages. In particular: - make everything start with a [room_id event_id] prefix - make sure we log a warning when catching exceptions rather than just turning them into other, more cryptic, exceptions. --- changelog.d/3904.misc | 1 + synapse/handlers/federation.py | 164 ++++++++++++++++++++++----------- synapse/util/retryutils.py | 2 +- 3 files changed, 111 insertions(+), 56 deletions(-) create mode 100644 changelog.d/3904.misc diff --git a/changelog.d/3904.misc b/changelog.d/3904.misc new file mode 100644 index 000000000000..1e3c8e1706ca --- /dev/null +++ b/changelog.d/3904.misc @@ -0,0 +1 @@ +Improve the logging when handling a federation transaction \ No newline at end of file diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f10b46414bf6..8d6bd7976d74 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,6 +69,27 @@ logger = logging.getLogger(__name__) +def shortstr(iterable, maxitems=5): + """If iterable has maxitems or fewer, return the stringification of a list + containing those items. + + Otherwise, return the stringification of a a list with the first maxitems items, + followed by "...". + + Args: + iterable (Iterable): iterable to truncate + maxitems (int): number of items to return before truncating + + Returns: + unicode + """ + + items = list(itertools.islice(iterable, maxitems + 1)) + if len(items) <= maxitems: + return str(items) + return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]" + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -114,7 +135,6 @@ def __init__(self, hs): self._room_pdu_linearizer = Linearizer("fed_room_pdu") @defer.inlineCallbacks - @log_function def on_receive_pdu( self, origin, pdu, get_missing=True, sent_to_us_directly=False, ): @@ -130,9 +150,17 @@ def on_receive_pdu( Returns (Deferred): completes with None """ + room_id = pdu.room_id + event_id = pdu.event_id + + logger.info( + "[%s %s] handling received PDU: %s", + room_id, event_id, pdu, + ) + # We reprocess pdus when we have seen them only as outliers existing = yield self.store.get_event( - pdu.event_id, + event_id, allow_none=True, allow_rejected=True, ) @@ -147,7 +175,7 @@ def on_receive_pdu( ) ) if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) + logger.debug("[%s %s]: Already seen pdu", room_id, event_id) return # do some initial sanity-checking of the event. In particular, make @@ -156,6 +184,7 @@ def on_receive_pdu( try: self._sanity_check_event(pdu) except SynapseError as err: + logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id) raise FederationError( "ERROR", err.code, @@ -165,10 +194,12 @@ def on_receive_pdu( # If we are currently in the process of joining this room, then we # queue up events for later processing. - if pdu.room_id in self.room_queues: - logger.info("Ignoring PDU %s for room %s from %s for now; join " - "in progress", pdu.event_id, pdu.room_id, origin) - self.room_queues[pdu.room_id].append((pdu, origin)) + if room_id in self.room_queues: + logger.info( + "[%s %s] Queuing PDU from %s for now: join in progress", + room_id, event_id, origin, + ) + self.room_queues[room_id].append((pdu, origin)) return # If we're no longer in the room just ditch the event entirely. This @@ -179,7 +210,7 @@ def on_receive_pdu( # we should check if we *are* in fact in the room. If we are then we # can magically rejoin the room. is_in_room = yield self.auth.check_host_in_room( - pdu.room_id, + room_id, self.server_name ) if not is_in_room: @@ -188,8 +219,8 @@ def on_receive_pdu( ) if was_in_room: logger.info( - "Ignoring PDU %s for room %s from %s as we've left the room!", - pdu.event_id, pdu.room_id, origin, + "[%s %s] Ignoring PDU from %s as we've left the room", + room_id, event_id, origin, ) defer.returnValue(None) @@ -204,8 +235,8 @@ def on_receive_pdu( ) logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth + "[%s %s] min_depth: %d", + room_id, event_id, min_depth, ) prevs = {e_id for e_id, _ in pdu.prev_events} @@ -218,17 +249,18 @@ def on_receive_pdu( # send to the clients. pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: + missing_prevs = prevs - seen + if get_missing and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) with (yield self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), + "[%s %s] Acquired room lock to fetch %d missing prev_events", + room_id, event_id, len(missing_prevs), ) yield self._get_missing_events_for_pdu( @@ -241,19 +273,23 @@ def on_receive_pdu( if not prevs - seen: logger.info( - "Found all missing prev events for %s", pdu.event_id + "[%s %s] Found all missing prev_events", + room_id, event_id, ) - elif prevs - seen: + elif missing_prevs: logger.info( - "Not fetching %d missing events for room %r,event %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, - list(prevs - seen)[:5], + "[%s %s] Not recursively fetching %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) if sent_to_us_directly and prevs - seen: # If they have sent it to us directly, and the server # isn't telling us about the auth events that it's # made a message referencing, we explode + logger.warn( + "[%s %s] Failed to fetch %d prev events: rejecting", + room_id, event_id, len(prevs - seen), + ) raise FederationError( "ERROR", 403, @@ -270,15 +306,19 @@ def on_receive_pdu( auth_chains = set() try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + ours = yield self.store.get_state_groups(room_id, list(seen)) state_groups.append(ours) # Ask the remote server for the states we don't # know about for p in prevs - seen: + logger.info( + "[%s %s] Requesting state at missing prev_event %s", + room_id, event_id, p, + ) state, got_auth_chain = ( yield self.federation_client.get_state_for_room( - origin, pdu.room_id, p + origin, room_id, p, ) ) auth_chains.update(got_auth_chain) @@ -291,19 +331,24 @@ def fetch(ev_ids): ev_ids, get_prev_content=False, check_redacted=False ) - room_version = yield self.store.get_room_version(pdu.room_id) + room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {pdu.event_id: pdu}, fetch + room_version, state_groups, {event_id: pdu}, fetch ) state = (yield self.store.get_events(state_map.values())).values() auth_chain = list(auth_chains) except Exception: + logger.warn( + "[%s %s] Error attempting to resolve state at missing " + "prev_events", + room_id, event_id, exc_info=True, + ) raise FederationError( "ERROR", 403, "We can't get valid state history.", - affected=pdu.event_id, + affected=event_id, ) yield self._process_received_pdu( @@ -322,15 +367,16 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. """ - # We recalculate seen, since it may have changed. + + room_id = pdu.room_id + event_id = pdu.event_id + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) + latest = yield self.store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -338,8 +384,8 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): latest |= seen logger.info( - "Missing %d events for room %r pdu %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] + "[%s %s]: Requesting %d prev_events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) # XXX: we set timeout to 10s to help workaround @@ -392,7 +438,7 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): missing_events = yield self.federation_client.get_missing_events( origin, - pdu.room_id, + room_id, earliest_events_ids=list(latest), latest_events=[pdu], limit=10, @@ -401,37 +447,46 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): ) logger.info( - "Got %d events: %r...", - len(missing_events), [e.event_id for e in missing_events[:5]] + "[%s %s]: Got %d prev_events: %s", + room_id, event_id, len(missing_events), shortstr(missing_events), ) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) - for e in missing_events: - logger.info("Handling found event %s", e.event_id) + for ev in missing_events: + logger.info( + "[%s %s] Handling received prev_event %s", + room_id, event_id, ev.event_id, + ) try: yield self.on_receive_pdu( origin, - e, + ev, get_missing=False ) except FederationError as e: if e.code == 403: - logger.warn("Event %s failed history check.") + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) else: raise - @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state, auth_chain): + def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ - event = pdu + room_id = event.room_id + event_id = event.event_id - logger.debug("Processing event: %s", event) + logger.debug( + "[%s %s] Processing event: %s", + room_id, event_id, event, + ) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -440,15 +495,16 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain): # event. if state and auth_chain and not event.internal_metadata.is_outlier(): is_in_room = yield self.auth.check_host_in_room( - event.room_id, + room_id, self.server_name ) else: is_in_room = True + if not is_in_room: logger.info( - "Got event for room we're not in: %r %r", - event.room_id, event.event_id + "[%s %s] Got event for room we're not in", + room_id, event_id, ) try: @@ -460,7 +516,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain): "ERROR", e.code, e.msg, - affected=event.event_id, + affected=event_id, ) else: @@ -509,12 +565,12 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain): affected=event.event_id, ) - room = yield self.store.get_room(event.room_id) + room = yield self.store.get_room(room_id) if not room: try: yield self.store.store_room( - room_id=event.room_id, + room_id=room_id, room_creator_user_id="", is_public=False, ) @@ -542,7 +598,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain): if newly_joined: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + yield self.user_joined_room(user, room_id) @log_function @defer.inlineCallbacks @@ -1459,12 +1515,10 @@ def get_persisted_pdu(self, origin, event_id): else: defer.returnValue(None) - @log_function def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) @defer.inlineCallbacks - @log_function def _handle_new_event(self, origin, event, state=None, auth_events=None, backfilled=False): context = yield self._prep_event( @@ -1664,8 +1718,8 @@ def _prep_event(self, origin, event, state=None, auth_events=None): ) except AuthError as e: logger.warn( - "Rejecting %s because %s", - event.event_id, e.msg + "[%s %s] Rejecting: %s", + event.room_id, event.event_id, e.msg ) context.rejected = RejectedReason.AUTH_ERROR diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 8a3a06fd743e..26cce7d197d6 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -188,7 +188,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): else: self.retry_interval = self.min_retry_interval - logger.debug( + logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", self.destination, exc_type, exc_val, self.retry_interval )