Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3604 from matrix-org/rav/background_process_fixes
Browse files Browse the repository at this point in the history
Wrap a number of things that run in the background
  • Loading branch information
richvdh authored Jul 25, 2018
2 parents 1e5dbdc + f59be4e commit 1bfb5be
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/3604.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track resource usage by background processes
13 changes: 10 additions & 3 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
Expand Down Expand Up @@ -427,6 +428,9 @@ def profiled(*args, **kargs):
# currently either 0 or 1
stats_process = []

def start_phone_stats_home():
run_as_background_process("phone_stats_home", phone_stats_home)

@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
Expand Down Expand Up @@ -498,7 +502,10 @@ def performance_stats_init():
)

def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits()
run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)

# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
Expand All @@ -507,15 +514,15 @@ def generate_user_daily_visit_stats():

if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(0, performance_stats_init)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
clock.call_later(5 * 60, start_phone_stats_home)

if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)
Expand Down
6 changes: 5 additions & 1 deletion synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background

Expand Down Expand Up @@ -129,7 +130,7 @@ def __init__(self, hs):
self.attestations = hs.get_groups_attestation_signing()

self._renew_attestations_loop = self.clock.looping_call(
self._renew_attestations, 30 * 60 * 1000,
self._start_renew_attestations, 30 * 60 * 1000,
)

@defer.inlineCallbacks
Expand All @@ -151,6 +152,9 @@ def on_renew_attestation(self, group_id, user_id, content):

defer.returnValue({})

def _start_renew_attestations(self):
run_as_background_process("renew_attestations", self._renew_attestations)

@defer.inlineCallbacks
def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def await_sync(self, data):
"""Returns a deferred that is resolved when we receive a SYNC command
with given data.
Used by tests.
[Not currently] used by tests.
"""
return self.awaiting_syncs.setdefault(data, defer.Deferred())

Expand Down
14 changes: 8 additions & 6 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.internet.protocol import Factory

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func

from .protocol import ServerReplicationStreamProtocol
Expand Down Expand Up @@ -117,7 +118,6 @@ def on_shutdown(self):
for conn in self.connections:
conn.send_error("server shutting down")

@defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
Expand All @@ -132,14 +132,16 @@ def on_notifier_poke(self):
stream.discard_updates_and_advance()
return

# If we're in the process of checking for new updates, mark that fact
# and return
self.pending_updates = True

if self.is_looping:
logger.debug("Noitifier poke loop already running")
self.pending_updates = True
logger.debug("Notifier poke loop already running")
return

self.pending_updates = True
run_as_background_process("replication_notifier", self._run_notifier_loop)

@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True

try:
Expand Down
8 changes: 7 additions & 1 deletion synapse/rest/media/v1/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SynapseError,
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
Expand Down Expand Up @@ -100,10 +101,15 @@ def __init__(self, hs):
)

self.clock.looping_call(
self._update_recently_accessed,
self._start_update_recently_accessed,
UPDATE_RECENTLY_ACCESSED_TS,
)

def _start_update_recently_accessed(self):
run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed,
)

@defer.inlineCallbacks
def _update_recently_accessed(self):
remote_media = self.recently_accessed_remotes
Expand Down
8 changes: 7 additions & 1 deletion synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -81,7 +82,7 @@ def __init__(self, hs, media_repo, media_storage):
self._cache.start()

self._cleaner_loop = self.clock.looping_call(
self._expire_url_cache_data, 10 * 1000
self._start_expire_url_cache_data, 10 * 1000,
)

def render_OPTIONS(self, request):
Expand Down Expand Up @@ -371,6 +372,11 @@ def _download_url(self, url, user):
"etag": headers["ETag"][0] if "ETag" in headers else None,
})

def _start_expire_url_cache_data(self):
run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data,
)

@defer.inlineCallbacks
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList

from ._base import Cache, SQLBaseStore
Expand Down Expand Up @@ -711,6 +712,9 @@ def _prune_txn(txn):

logger.info("Pruned %d device list outbound pokes", txn.rowcount)

return self.runInteraction(
"_prune_old_outbound_device_pokes", _prune_txn
run_as_background_process(
"prune_old_outbound_device_pokes",
self.runInteraction,
"_prune_old_outbound_device_pokes",
_prune_txn,
)
9 changes: 6 additions & 3 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
Expand Down Expand Up @@ -446,7 +447,7 @@ def __init__(self, db_conn, hs):
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
)

def _update_min_depth_for_room_txn(self, txn, room_id, depth):
Expand Down Expand Up @@ -548,9 +549,11 @@ def _delete_old_forward_extrem_cache_txn(txn):
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
_delete_old_forward_extrem_cache_txn,
)

def clean_room_for_join(self, room_id):
Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks

Expand Down Expand Up @@ -458,11 +459,12 @@ def remove_push_actions_from_staging(self, event_id):
"Error removing push actions after event persistence failure",
)

@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
run_as_background_process(
"event_push_action_stream_orderings",
self.runInteraction,
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
self._find_stream_orderings_for_times_txn,
)

def _find_stream_orderings_for_times_txn(self, txn):
Expand Down Expand Up @@ -604,7 +606,7 @@ def __init__(self, db_conn, hs):

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
self._start_rotate_notifs, 30 * 60 * 1000,
)

def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
Expand Down Expand Up @@ -787,6 +789,9 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))

def _start_rotate_notifs(self):
run_as_background_process("rotate_notifs", self._rotate_notifs)

@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached

from ._base import SQLBaseStore
Expand Down Expand Up @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs)

self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
Expand Down Expand Up @@ -271,6 +272,9 @@ def _get_destinations_needing_retry(self, txn):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)

def _start_cleanup_transactions(self):
run_as_background_process("cleanup_transactions", self._cleanup_transactions)

def _cleanup_transactions(self):
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
Expand Down
37 changes: 30 additions & 7 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,44 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import tempfile

from mock import Mock, NonCallableMock

from twisted.internet import defer, reactor
from twisted.internet.defer import Deferred

from synapse.replication.tcp.client import (
ReplicationClientFactory,
ReplicationClientHandler,
)
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable

from tests import unittest
from tests.utils import setup_test_homeserver


class TestReplicationClientHandler(ReplicationClientHandler):
"""Overrides on_rdata so that we can wait for it to happen"""
def __init__(self, store):
super(TestReplicationClientHandler, self).__init__(store)
self._rdata_awaiters = []

def await_replication(self):
d = Deferred()
self._rdata_awaiters.append(d)
return make_deferred_yieldable(d)

def on_rdata(self, stream_name, token, rows):
awaiters = self._rdata_awaiters
self._rdata_awaiters = []
super(TestReplicationClientHandler, self).on_rdata(stream_name, token, rows)
with PreserveLoggingContext():
for a in awaiters:
a.callback(None)


class BaseSlavedStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
Expand All @@ -52,20 +73,22 @@ def setUp(self):
self.addCleanup(listener.stopListening)
self.streamer = server_factory.streamer

self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler = TestReplicationClientHandler(self.slaved_store)
client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
client_connector = reactor.connectUNIX(path, client_factory)
self.addCleanup(client_factory.stopTrying)
self.addCleanup(client_connector.disconnect)

@defer.inlineCallbacks
def replicate(self):
yield self.streamer.on_notifier_poke()
d = self.replication_handler.await_sync("replication_test")
self.streamer.send_sync_to_all_connections("replication_test")
yield d
"""Tell the master side of replication that something has happened, and then
wait for the replication to occur.
"""
# xxx: should we be more specific in what we wait for?
d = self.replication_handler.await_replication()
self.streamer.on_notifier_poke()
return d

@defer.inlineCallbacks
def check(self, method, args, expected_result=None):
Expand Down

0 comments on commit 1bfb5be

Please sign in to comment.