From 748d8fdc7bcdb43719e99a48cc74bf078f22396f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Sep 2016 15:31:47 +0100 Subject: [PATCH 1/2] Reduce DB hits for replication Some streams will occaisonally advance their positions without actually having any new rows to send over federation. Currently this means that the token will not advance on the workers, leading to them repeatedly sending a slightly out of date token. This in turns requires the master to hit the DB to check if there are any new rows, rather than hitting the no op logic where we check if the given token matches the current token. This commit changes the API to always return an entry if the position for a stream has changed, allowing workers to advance their tokens correctly. --- .gitignore | 8 +- synapse/app/pusher.py | 4 +- synapse/replication/resource.py | 139 +++++++++++++++++------ synapse/storage/room.py | 3 + tests/replication/slave/storage/_base.py | 3 +- tests/replication/test_resource.py | 3 +- 6 files changed, 115 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index f8c400013426..491047c3521c 100644 --- a/.gitignore +++ b/.gitignore @@ -24,10 +24,10 @@ homeserver*.yaml .coverage htmlcov -demo/*.db -demo/*.log -demo/*.log.* -demo/*.pid +demo/*/*.db +demo/*/*.log +demo/*/*.log.* +demo/*/*.pid demo/media_store.* demo/etc diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d59f4a571ca8..1a6f5507a988 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -197,7 +197,7 @@ def poke_pushers(results): yield start_pusher(user_id, app_id, pushkey) stream = results.get("events") - if stream: + if stream and stream["rows"]: min_stream_id = stream["rows"][0][0] max_stream_id = stream["position"] preserve_fn(pusher_pool.on_new_notifications)( @@ -205,7 +205,7 @@ def poke_pushers(results): ) stream = results.get("receipts") - if stream: + if stream and stream["rows"]: rows = stream["rows"] affected_room_ids = set(row[1] for row in rows) min_stream_id = rows[0][0] diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 9aab3ce23c5e..585bd1c4adb4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -17,6 +17,7 @@ from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource from synapse.replication.presence_resource import PresenceResource +from synapse.api.errors import SynapseError from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -166,7 +167,8 @@ def _async_render_GET(self, request): def replicate(): return self.replicate(request_streams, limit) - result = yield self.notifier.wait_for_replication(replicate, timeout) + writer = yield self.notifier.wait_for_replication(replicate, timeout) + result = writer.finish() for stream_name, stream_content in result.items(): logger.info( @@ -186,6 +188,9 @@ def replicate(self, request_streams, limit): current_token = yield self.current_replication_token() logger.debug("Replicating up to %r", current_token) + if limit == 0: + raise SynapseError(400, "Limit cannot be 0") + yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) # TODO: implement limit @@ -200,7 +205,7 @@ def replicate(self, request_streams, limit): self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) - defer.returnValue(writer.finish()) + defer.returnValue(writer) def streams(self, writer, current_token, request_streams): request_token = request_streams.get("streams") @@ -233,31 +238,52 @@ def events(self, writer, current_token, limit, request_streams): request_backfill = request_streams.get("backfill") if request_events is not None or request_backfill is not None: - if request_events is None: + if request_backfill is None: request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill + + no_new_tokens = ( + request_events == current_token.events + and request_backfill == current_token.backfill + ) + if no_new_tokens: + return + res = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit ) - writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "internal", "json", "state_group" - )) - writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "internal", "json", "state_group" - )) + + upto_events_token = _position_from_rows( + res.new_forward_events, current_token.events + ) + + upto_backfill_token = _position_from_rows( + res.new_backfill_events, current_token.backfill + ) + + if request_events != upto_events_token: + writer.write_header_and_rows("events", res.new_forward_events, ( + "position", "internal", "json", "state_group" + ), position=upto_events_token) + + if request_backfill != upto_backfill_token: + writer.write_header_and_rows("backfill", res.new_backfill_events, ( + "position", "internal", "json", "state_group", + ), position=upto_backfill_token) + writer.write_header_and_rows( "forward_ex_outliers", res.forward_ex_outliers, - ("position", "event_id", "state_group") + ("position", "event_id", "state_group"), ) writer.write_header_and_rows( "backward_ex_outliers", res.backward_ex_outliers, - ("position", "event_id", "state_group") + ("position", "event_id", "state_group"), ) writer.write_header_and_rows( - "state_resets", res.state_resets, ("position",) + "state_resets", res.state_resets, ("position",), ) @defer.inlineCallbacks @@ -266,15 +292,16 @@ def presence(self, writer, current_token, request_streams): request_presence = request_streams.get("presence") - if request_presence is not None: + if request_presence is not None and request_presence != current_position: presence_rows = yield self.presence_handler.get_all_presence_updates( request_presence, current_position ) + upto_token = _position_from_rows(presence_rows, current_position) writer.write_header_and_rows("presence", presence_rows, ( "position", "user_id", "state", "last_active_ts", "last_federation_update_ts", "last_user_sync_ts", "status_msg", "currently_active", - )) + ), position=upto_token) @defer.inlineCallbacks def typing(self, writer, current_token, request_streams): @@ -282,7 +309,7 @@ def typing(self, writer, current_token, request_streams): request_typing = request_streams.get("typing") - if request_typing is not None: + if request_typing is not None and request_typing != current_position: # If they have a higher token than current max, we can assume that # they had been talking to a previous instance of the master. Since # we reset the token on restart, the best (but hacky) thing we can @@ -293,9 +320,10 @@ def typing(self, writer, current_token, request_streams): typing_rows = yield self.typing_handler.get_all_typing_updates( request_typing, current_position ) + upto_token = _position_from_rows(typing_rows, current_position) writer.write_header_and_rows("typing", typing_rows, ( "position", "room_id", "typing" - )) + ), position=upto_token) @defer.inlineCallbacks def receipts(self, writer, current_token, limit, request_streams): @@ -303,13 +331,14 @@ def receipts(self, writer, current_token, limit, request_streams): request_receipts = request_streams.get("receipts") - if request_receipts is not None: + if request_receipts is not None and request_receipts != current_position: receipts_rows = yield self.store.get_all_updated_receipts( request_receipts, current_position, limit ) + upto_token = _position_from_rows(receipts_rows, current_position) writer.write_header_and_rows("receipts", receipts_rows, ( "position", "room_id", "receipt_type", "user_id", "event_id", "data" - )) + ), position=upto_token) @defer.inlineCallbacks def account_data(self, writer, current_token, limit, request_streams): @@ -324,23 +353,36 @@ def account_data(self, writer, current_token, limit, request_streams): user_account_data = current_position if room_account_data is None: room_account_data = current_position + + no_new_tokens = ( + user_account_data == current_position + and room_account_data == current_position + ) + if no_new_tokens: + return + user_rows, room_rows = yield self.store.get_all_updated_account_data( user_account_data, room_account_data, current_position, limit ) + + upto_users_token = _position_from_rows(user_rows, current_position) + upto_rooms_token = _position_from_rows(room_rows, current_position) + writer.write_header_and_rows("user_account_data", user_rows, ( "position", "user_id", "type", "content" - )) + ), position=upto_users_token) writer.write_header_and_rows("room_account_data", room_rows, ( "position", "user_id", "room_id", "type", "content" - )) + ), position=upto_rooms_token) if tag_account_data is not None: tag_rows = yield self.store.get_all_updated_tags( tag_account_data, current_position, limit ) + upto_tag_token = _position_from_rows(tag_rows, current_position) writer.write_header_and_rows("tag_account_data", tag_rows, ( "position", "user_id", "room_id", "tags" - )) + ), position=upto_tag_token) @defer.inlineCallbacks def push_rules(self, writer, current_token, limit, request_streams): @@ -348,14 +390,15 @@ def push_rules(self, writer, current_token, limit, request_streams): push_rules = request_streams.get("push_rules") - if push_rules is not None: + if push_rules is not None and push_rules != current_position: rows = yield self.store.get_all_push_rule_updates( push_rules, current_position, limit ) + upto_token = _position_from_rows(rows, current_position) writer.write_header_and_rows("push_rules", rows, ( "position", "event_stream_ordering", "user_id", "rule_id", "op", "priority_class", "priority", "conditions", "actions" - )) + ), position=upto_token) @defer.inlineCallbacks def pushers(self, writer, current_token, limit, request_streams): @@ -363,18 +406,19 @@ def pushers(self, writer, current_token, limit, request_streams): pushers = request_streams.get("pushers") - if pushers is not None: + if pushers is not None and pushers != current_position: updated, deleted = yield self.store.get_all_updated_pushers( pushers, current_position, limit ) + upto_token = _position_from_rows(updated, current_position) writer.write_header_and_rows("pushers", updated, ( "position", "user_id", "access_token", "profile_tag", "kind", "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" - )) + ), position=upto_token) writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" - )) + ), position=upto_token) @defer.inlineCallbacks def caches(self, writer, current_token, limit, request_streams): @@ -382,13 +426,14 @@ def caches(self, writer, current_token, limit, request_streams): caches = request_streams.get("caches") - if caches is not None: + if caches is not None and caches != current_position: updated_caches = yield self.store.get_all_updated_caches( caches, current_position, limit ) + upto_token = _position_from_rows(updated_caches, current_position) writer.write_header_and_rows("caches", updated_caches, ( "position", "cache_func", "keys", "invalidation_ts" - )) + ), position=upto_token) @defer.inlineCallbacks def to_device(self, writer, current_token, limit, request_streams): @@ -396,13 +441,14 @@ def to_device(self, writer, current_token, limit, request_streams): to_device = request_streams.get("to_device") - if to_device is not None: + if to_device is not None and to_device != current_position: to_device_rows = yield self.store.get_all_new_device_messages( to_device, current_position, limit ) + upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( "position", "user_id", "device_id", "message_json" - )) + ), position=upto_token) @defer.inlineCallbacks def public_rooms(self, writer, current_token, limit, request_streams): @@ -410,13 +456,14 @@ def public_rooms(self, writer, current_token, limit, request_streams): public_rooms = request_streams.get("public_rooms") - if public_rooms is not None: + if public_rooms is not None and public_rooms != current_position: public_rooms_rows = yield self.store.get_all_new_public_rooms( public_rooms, current_position, limit ) + upto_token = _position_from_rows(public_rooms_rows, current_position) writer.write_header_and_rows("public_rooms", public_rooms_rows, ( "position", "room_id", "visibility" - )) + ), position=upto_token) class _Writer(object): @@ -426,11 +473,11 @@ def __init__(self): self.total = 0 def write_header_and_rows(self, name, rows, fields, position=None): - if not rows: - return - if position is None: - position = rows[-1][0] + if rows: + position = rows[-1][0] + else: + return self.streams[name] = { "position": position if type(position) is int else str(position), @@ -440,6 +487,9 @@ def write_header_and_rows(self, name, rows, fields, position=None): self.total += len(rows) + def __nonzero__(self): + return bool(self.total) + def finish(self): return self.streams @@ -461,3 +511,20 @@ def __new__(cls, *args): def __str__(self): return "_".join(str(value) for value in self) + + +def _position_from_rows(rows, current_position): + """Calculates a position to return for a stream. Ideally we want to return the + position of the last row, as that will be the most correct. However, if there + are no rows we fall back to using the current position to stop us from + repeatedly hitting the storage layer unncessarily thinking there are updates. + (Not all advances of the token correspond to an actual update) + + We can't just always return the current position, as we often limit the + number of rows we replicate, and so the stream may lag. The assumption is + that if the storage layer returns no new rows then we are not lagging and + we are at the `current_position`. + """ + if rows: + return rows[-1][0] + return current_position diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 2ef13d740328..11813b44f60c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -320,6 +320,9 @@ def get_all_new_public_rooms(txn): txn.execute(sql, (prev_id, current_id, limit,)) return txn.fetchall() + if prev_id == current_id: + return defer.succeed([]) + return self.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 1f13cd0bc069..b82868054d37 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -42,7 +42,8 @@ def setUp(self): @defer.inlineCallbacks def replicate(self): streams = self.slaved_store.stream_positions() - result = yield self.replication.replicate(streams, 100) + writer = yield self.replication.replicate(streams, 100) + result = writer.finish() yield self.slaved_store.process_replication(result) @defer.inlineCallbacks diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index b69832cc1b59..f406934a62e4 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -120,7 +120,7 @@ def test_timeout(self): self.hs.clock.advance_time_msec(1) code, body = yield get self.assertEquals(code, 200) - self.assertEquals(body, {}) + self.assertEquals(body.get("rows", []), []) test_timeout.__name__ = "test_timeout_%s" % (stream) return test_timeout @@ -195,7 +195,6 @@ def check_response(self, response_body): self.assertIn("field_names", stream) field_names = stream["field_names"] self.assertIn("rows", stream) - self.assertTrue(stream["rows"]) for row in stream["rows"]: self.assertEquals( len(row), len(field_names), From 668f91d70787dbc007b8221f0e6d7fda8affdacc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Oct 2016 13:57:22 +0100 Subject: [PATCH 2/2] Fix check of wrong variable --- synapse/replication/resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 585bd1c4adb4..5a14c51d235a 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -238,7 +238,7 @@ def events(self, writer, current_token, limit, request_streams): request_backfill = request_streams.get("backfill") if request_events is not None or request_backfill is not None: - if request_backfill is None: + if request_events is None: request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill