Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3556 from matrix-org/rav/background_processes
Browse files Browse the repository at this point in the history
Run things as background processes
  • Loading branch information
richvdh authored Jul 19, 2018
2 parents a97c845 + 8c69b73 commit c754e00
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 104 deletions.
1 change: 1 addition & 0 deletions changelog.d/3556.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track resource usage by background processes
15 changes: 6 additions & 9 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def notify_new_events(self, current_id):

# fire off a processing loop in the background
run_as_background_process(
"process_transaction_queue",
"process_event_queue_for_federation",
self._process_event_queue_loop,
)

Expand Down Expand Up @@ -434,14 +434,11 @@ def _attempt_new_transaction(self, destination):

logger.debug("TX [%s] Starting transaction loop", destination)

# Drop the logcontext before starting the transaction. It doesn't
# really make sense to log all the outbound transactions against
# whatever path led us to this point: that's pretty arbitrary really.
#
# (this also means we can fire off _perform_transaction without
# yielding)
with logcontext.PreserveLoggingContext():
self._transaction_transmission_loop(destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
destination,
)

@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process

from . import engines
from ._base import SQLBaseStore

Expand Down Expand Up @@ -87,10 +89,14 @@ def __init__(self, db_conn, hs):
self._background_update_handlers = {}
self._all_done = False

@defer.inlineCallbacks
def start_doing_background_updates(self):
logger.info("Starting background schema updates")
run_as_background_process(
"background_updates", self._run_background_updates,
)

@defer.inlineCallbacks
def _run_background_updates(self):
logger.info("Starting background schema updates")
while True:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
Expand Down
15 changes: 11 additions & 4 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR

from . import background_updates
Expand Down Expand Up @@ -93,10 +94,16 @@ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
self._batch_row_update[key] = (user_agent, device_id, now)

def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)

run_as_background_process(
"update_client_ips", update,
)

def _update_client_ips_batch_txn(self, txn, to_update):
Expand Down
10 changes: 4 additions & 6 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -155,11 +156,8 @@ def handle_queue_loop():
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)

# set handle_queue_loop off on the background. We don't want to
# attribute work done in it to the current request, so we drop the
# logcontext altogether.
with PreserveLoggingContext():
handle_queue_loop()
# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)

def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
Expand Down
10 changes: 6 additions & 4 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
Expand Down Expand Up @@ -322,10 +323,11 @@ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
should_start = False

if should_start:
with PreserveLoggingContext():
self.runWithConnection(
self._do_fetch
)
run_as_background_process(
"fetch_events",
self.runWithConnection,
self._do_fetch,
)

logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
Expand Down
6 changes: 5 additions & 1 deletion synapse/util/caches/expiringcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
from collections import OrderedDict

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -63,7 +64,10 @@ def start(self):
return

def f():
self._prune_cache()
run_as_background_process(
"prune_cache_%s" % self._cache_name,
self._prune_cache,
)

self._clock.looping_call(f, self._expiry_ms / 2)

Expand Down
48 changes: 22 additions & 26 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

from twisted.internet import defer

from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background

logger = logging.getLogger(__name__)


def user_left_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id)
distributor.fire("user_left_room", user=user, room_id=room_id)


def user_joined_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id)
distributor.fire("user_joined_room", user=user, room_id=room_id)


class Distributor(object):
Expand All @@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""

def __init__(self, suppress_failures=True):
self.suppress_failures = suppress_failures

def __init__(self):
self.signals = {}
self.pre_registration = {}

Expand All @@ -56,7 +52,6 @@ def declare(self, name):

self.signals[name] = Signal(
name,
suppress_failures=self.suppress_failures,
)

if name in self.pre_registration:
Expand All @@ -75,10 +70,18 @@ def observe(self, name, observer):
self.pre_registration[name].append(observer)

def fire(self, name, *args, **kwargs):
"""Dispatches the given signal to the registered observers.
Runs the observers as a background process. Does not return a deferred.
"""
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))

return self.signals[name].fire(*args, **kwargs)
run_as_background_process(
name,
self.signals[name].fire,
*args, **kwargs
)


class Signal(object):
Expand All @@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""

def __init__(self, name, suppress_failures):
def __init__(self, name):
self.name = name
self.suppress_failures = suppress_failures
self.observers = []

def observe(self, observer):
Expand All @@ -103,7 +105,6 @@ def observe(self, observer):
Each observer callable may return a Deferred."""
self.observers.append(observer)

@defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
Expand All @@ -121,22 +122,17 @@ def eb(failure):
failure.type,
failure.value,
failure.getTracebackObject()))
if not self.suppress_failures:
return failure

return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)

with PreserveLoggingContext():
deferreds = [
do(observer)
for observer in self.observers
]

res = yield defer.gatherResults(
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)
deferreds = [
run_in_background(do, o)
for o in self.observers
]

defer.returnValue(res)
return make_deferred_yieldable(defer.gatherResults(
deferreds, consumeErrors=True,
))

def __repr__(self):
return "<Signal name=%r>" % (self.name,)
56 changes: 4 additions & 52 deletions tests/test_distributor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,8 +16,6 @@

from mock import Mock, patch

from twisted.internet import defer

from synapse.util.distributor import Distributor

from . import unittest
Expand All @@ -27,38 +26,15 @@ class DistributorTestCase(unittest.TestCase):
def setUp(self):
self.dist = Distributor()

@defer.inlineCallbacks
def test_signal_dispatch(self):
self.dist.declare("alert")

observer = Mock()
self.dist.observe("alert", observer)

d = self.dist.fire("alert", 1, 2, 3)
yield d
self.assertTrue(d.called)
self.dist.fire("alert", 1, 2, 3)
observer.assert_called_with(1, 2, 3)

@defer.inlineCallbacks
def test_signal_dispatch_deferred(self):
self.dist.declare("whine")

d_inner = defer.Deferred()

def observer():
return d_inner

self.dist.observe("whine", observer)

d_outer = self.dist.fire("whine")

self.assertFalse(d_outer.called)

d_inner.callback(None)
yield d_outer
self.assertTrue(d_outer.called)

@defer.inlineCallbacks
def test_signal_catch(self):
self.dist.declare("alarm")

Expand All @@ -71,9 +47,7 @@ def test_signal_catch(self):
with patch(
"synapse.util.distributor.logger", spec=["warning"]
) as mock_logger:
d = self.dist.fire("alarm", "Go")
yield d
self.assertTrue(d.called)
self.dist.fire("alarm", "Go")

observers[0].assert_called_once_with("Go")
observers[1].assert_called_once_with("Go")
Expand All @@ -83,34 +57,12 @@ def test_signal_catch(self):
mock_logger.warning.call_args[0][0], str
)

@defer.inlineCallbacks
def test_signal_catch_no_suppress(self):
# Gut-wrenching
self.dist.suppress_failures = False

self.dist.declare("whail")

class MyException(Exception):
pass

@defer.inlineCallbacks
def observer():
raise MyException("Oopsie")

self.dist.observe("whail", observer)

d = self.dist.fire("whail")

yield self.assertFailure(d, MyException)
self.dist.suppress_failures = True

@defer.inlineCallbacks
def test_signal_prereg(self):
observer = Mock()
self.dist.observe("flare", observer)

self.dist.declare("flare")
yield self.dist.fire("flare", 4, 5)
self.dist.fire("flare", 4, 5)

observer.assert_called_with(4, 5)

Expand Down

0 comments on commit c754e00

Please sign in to comment.