Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wrap a number of things that run in the background #3604

Merged
merged 3 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3604.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track resource usage by background processes
13 changes: 10 additions & 3 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
Expand Down Expand Up @@ -427,6 +428,9 @@ def profiled(*args, **kargs):
# currently either 0 or 1
stats_process = []

def start_phone_stats_home():
run_as_background_process("phone_stats_home", phone_stats_home)

@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
Expand Down Expand Up @@ -498,7 +502,10 @@ def performance_stats_init():
)

def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits()
run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)

# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
Expand All @@ -507,15 +514,15 @@ def generate_user_daily_visit_stats():

if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(0, performance_stats_init)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
clock.call_later(5 * 60, start_phone_stats_home)

if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)
Expand Down
6 changes: 5 additions & 1 deletion synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background

Expand Down Expand Up @@ -129,7 +130,7 @@ def __init__(self, hs):
self.attestations = hs.get_groups_attestation_signing()

self._renew_attestations_loop = self.clock.looping_call(
self._renew_attestations, 30 * 60 * 1000,
self._start_renew_attestations, 30 * 60 * 1000,
)

@defer.inlineCallbacks
Expand All @@ -151,6 +152,9 @@ def on_renew_attestation(self, group_id, user_id, content):

defer.returnValue({})

def _start_renew_attestations(self):
run_as_background_process("renew_attestations", self._renew_attestations)

@defer.inlineCallbacks
def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def await_sync(self, data):
"""Returns a deferred that is resolved when we receive a SYNC command
with given data.
Used by tests.
[Not currently] used by tests.
"""
return self.awaiting_syncs.setdefault(data, defer.Deferred())

Expand Down
14 changes: 8 additions & 6 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.internet.protocol import Factory

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func

from .protocol import ServerReplicationStreamProtocol
Expand Down Expand Up @@ -117,7 +118,6 @@ def on_shutdown(self):
for conn in self.connections:
conn.send_error("server shutting down")

@defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
Expand All @@ -132,14 +132,16 @@ def on_notifier_poke(self):
stream.discard_updates_and_advance()
return

# If we're in the process of checking for new updates, mark that fact
# and return
self.pending_updates = True

if self.is_looping:
logger.debug("Noitifier poke loop already running")
self.pending_updates = True
logger.debug("Notifier poke loop already running")
return

self.pending_updates = True
run_as_background_process("replication_notifier", self._run_notifier_loop)

@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True

try:
Expand Down
8 changes: 7 additions & 1 deletion synapse/rest/media/v1/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SynapseError,
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
Expand Down Expand Up @@ -100,10 +101,15 @@ def __init__(self, hs):
)

self.clock.looping_call(
self._update_recently_accessed,
self._start_update_recently_accessed,
UPDATE_RECENTLY_ACCESSED_TS,
)

def _start_update_recently_accessed(self):
run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed,
)

@defer.inlineCallbacks
def _update_recently_accessed(self):
remote_media = self.recently_accessed_remotes
Expand Down
8 changes: 7 additions & 1 deletion synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -81,7 +82,7 @@ def __init__(self, hs, media_repo, media_storage):
self._cache.start()

self._cleaner_loop = self.clock.looping_call(
self._expire_url_cache_data, 10 * 1000
self._start_expire_url_cache_data, 10 * 1000,
)

def render_OPTIONS(self, request):
Expand Down Expand Up @@ -371,6 +372,11 @@ def _download_url(self, url, user):
"etag": headers["ETag"][0] if "ETag" in headers else None,
})

def _start_expire_url_cache_data(self):
run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data,
)

@defer.inlineCallbacks
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList

from ._base import Cache, SQLBaseStore
Expand Down Expand Up @@ -711,6 +712,9 @@ def _prune_txn(txn):

logger.info("Pruned %d device list outbound pokes", txn.rowcount)

return self.runInteraction(
"_prune_old_outbound_device_pokes", _prune_txn
run_as_background_process(
"prune_old_outbound_device_pokes",
self.runInteraction,
"_prune_old_outbound_device_pokes",
_prune_txn,
)
9 changes: 6 additions & 3 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
Expand Down Expand Up @@ -446,7 +447,7 @@ def __init__(self, db_conn, hs):
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
)

def _update_min_depth_for_room_txn(self, txn, room_id, depth):
Expand Down Expand Up @@ -548,9 +549,11 @@ def _delete_old_forward_extrem_cache_txn(txn):
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
_delete_old_forward_extrem_cache_txn,
)

def clean_room_for_join(self, room_id):
Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks

Expand Down Expand Up @@ -458,11 +459,12 @@ def remove_push_actions_from_staging(self, event_id):
"Error removing push actions after event persistence failure",
)

@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
run_as_background_process(
"event_push_action_stream_orderings",
self.runInteraction,
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
self._find_stream_orderings_for_times_txn,
)

def _find_stream_orderings_for_times_txn(self, txn):
Expand Down Expand Up @@ -604,7 +606,7 @@ def __init__(self, db_conn, hs):

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
self._start_rotate_notifs, 30 * 60 * 1000,
)

def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
Expand Down Expand Up @@ -787,6 +789,9 @@ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))

def _start_rotate_notifs(self):
run_as_background_process("rotate_notifs", self._rotate_notifs)

@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached

from ._base import SQLBaseStore
Expand Down Expand Up @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs)

self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
Expand Down Expand Up @@ -271,6 +272,9 @@ def _get_destinations_needing_retry(self, txn):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)

def _start_cleanup_transactions(self):
run_as_background_process("cleanup_transactions", self._cleanup_transactions)

def _cleanup_transactions(self):
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
Expand Down
37 changes: 30 additions & 7 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,44 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import tempfile

from mock import Mock, NonCallableMock

from twisted.internet import defer, reactor
from twisted.internet.defer import Deferred

from synapse.replication.tcp.client import (
ReplicationClientFactory,
ReplicationClientHandler,
)
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable

from tests import unittest
from tests.utils import setup_test_homeserver


class TestReplicationClientHandler(ReplicationClientHandler):
"""Overrides on_rdata so that we can wait for it to happen"""
def __init__(self, store):
super(TestReplicationClientHandler, self).__init__(store)
self._rdata_awaiters = []

def await_replication(self):
d = Deferred()
self._rdata_awaiters.append(d)
return make_deferred_yieldable(d)

def on_rdata(self, stream_name, token, rows):
awaiters = self._rdata_awaiters
self._rdata_awaiters = []
super(TestReplicationClientHandler, self).on_rdata(stream_name, token, rows)
with PreserveLoggingContext():
for a in awaiters:
a.callback(None)


class BaseSlavedStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
Expand All @@ -52,20 +73,22 @@ def setUp(self):
self.addCleanup(listener.stopListening)
self.streamer = server_factory.streamer

self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler = TestReplicationClientHandler(self.slaved_store)
client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
client_connector = reactor.connectUNIX(path, client_factory)
self.addCleanup(client_factory.stopTrying)
self.addCleanup(client_connector.disconnect)

@defer.inlineCallbacks
def replicate(self):
yield self.streamer.on_notifier_poke()
d = self.replication_handler.await_sync("replication_test")
self.streamer.send_sync_to_all_connections("replication_test")
yield d
"""Tell the master side of replication that something has happened, and then
wait for the replication to occur.
"""
# xxx: should we be more specific in what we wait for?
d = self.replication_handler.await_replication()
self.streamer.on_notifier_poke()
return d

@defer.inlineCallbacks
def check(self, method, args, expected_result=None):
Expand Down