Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Include eventid in log lines when processing incoming federation tran…
Browse files Browse the repository at this point in the history
…sactions (#3959)

when processing incoming transactions, it can be hard to see what's going on,
because we process a bunch of stuff in parallel, and because we may end up
recursively working our way through a chain of three or four events.

This commit creates a way to use logcontexts to add the relevant event ids to
the log lines.
  • Loading branch information
richvdh authored Sep 27, 2018
1 parent ae6ad4c commit 4a15a3e
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 57 deletions.
1 change: 1 addition & 0 deletions changelog.d/3959.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include eventid in log lines when processing incoming federation transactions
32 changes: 17 additions & 15 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function

# when processing incoming transactions, we try to handle multiple rooms in
Expand Down Expand Up @@ -187,21 +188,22 @@ def process_pdus_for_room(room_id):

for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
with nested_logging_context(event_id):
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)

yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
Expand Down
65 changes: 39 additions & 26 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,18 @@ def on_receive_pdu(
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,

with logcontext.nested_logging_context(p):
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
)
)
)
auth_chains.update(got_auth_chain)
state_group = {(x.type, x.state_key): x.event_id for x in state}
state_groups.append(state_group)
auth_chains.update(got_auth_chain)
state_group = {
(x.type, x.state_key): x.event_id for x in state
}
state_groups.append(state_group)

# Resolve any conflicting state
def fetch(ev_ids):
Expand Down Expand Up @@ -483,20 +487,21 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
with logcontext.nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
else:
raise
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
)
else:
raise

@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
Expand Down Expand Up @@ -1135,7 +1140,8 @@ def _handle_queued_pdus(self, room_queue):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
with logcontext.nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
Expand Down Expand Up @@ -1581,15 +1587,22 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self._prep_event,

@defer.inlineCallbacks
def prep(ev_info):
event = ev_info["event"]
with logcontext.nested_logging_context(suffix=event.event_id):
res = yield self._prep_event(
origin,
ev_info["event"],
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
defer.returnValue(res)

contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
Expand Down
41 changes: 37 additions & 4 deletions synapse/util/logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __nonzero__(self):

sentinel = Sentinel()

def __init__(self, name=None, parent_context=None):
def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name

Expand All @@ -218,6 +218,13 @@ def __init__(self, name=None, parent_context=None):

self.parent_context = parent_context

if self.parent_context is not None:
self.parent_context.copy_to(self)

if request is not None:
# the request param overrides the request from the parent context
self.request = request

def __str__(self):
return "%s@%x" % (self.name, id(self))

Expand Down Expand Up @@ -256,9 +263,6 @@ def __enter__(self):
)
self.alive = True

if self.parent_context is not None:
self.parent_context.copy_to(self)

return self

def __exit__(self, type, value, traceback):
Expand Down Expand Up @@ -439,6 +443,35 @@ def __exit__(self, type, value, traceback):
)


def nested_logging_context(suffix, parent_context=None):
"""Creates a new logging context as a child of another.
The nested logging context will have a 'request' made up of the parent context's
request, plus the given suffix.
CPU/db usage stats will be added to the parent context's on exit.
Normal usage looks like:
with nested_logging_context(suffix):
# ... do stuff
Args:
suffix (str): suffix to add to the parent context's 'request'.
parent_context (LoggingContext|None): parent context. Will use the current context
if None.
Returns:
LoggingContext: new logging context.
"""
if parent_context is None:
parent_context = LoggingContext.current_context()
return LoggingContext(
parent_context=parent_context,
request=parent_context.request + "-" + suffix,
)


def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
Expand Down
28 changes: 16 additions & 12 deletions tests/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
from synapse.util.logcontext import LoggingContext

from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
Expand Down Expand Up @@ -117,9 +118,10 @@ def post_json(destination, path, data, headers=None, timeout=0):
}
)

d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
with LoggingContext(request="lying_event"):
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)

# Step the reactor, so the database fetches come back
self.reactor.advance(1)
Expand Down Expand Up @@ -209,11 +211,12 @@ def get_json(destination, path, args, headers=None):
}
)

d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
with LoggingContext(request="good_event"):
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)

bad_event = FrozenEvent(
{
Expand All @@ -230,10 +233,11 @@ def get_json(destination, path, args, headers=None):
}
)

d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
with LoggingContext(request="bad_event"):
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)

extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
Expand Down
5 changes: 5 additions & 0 deletions tests/util/test_logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ def test_make_deferred_yieldable_on_non_deferred(self):
self.assertEqual(r, "bum")
self._check_test_key("one")

def test_nested_logging_context(self):
with LoggingContext(request="foo"):
nested_context = logcontext.nested_logging_context(suffix="bar")
self.assertEqual(nested_context.request, "foo-bar")


# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on
Expand Down

0 comments on commit 4a15a3e

Please sign in to comment.