Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace HTTP replication with TCP replication (Server side part) #2082

Merged
merged 17 commits into from
Apr 4, 2017
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions docs/tcp_replication.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
TCP Replication
===============

This describes the TCP replication protocol that replaces the HTTP protocol.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get rid of this line, it's too vague to be useful


Motivation
----------

The HTTP API used long poll from the workers to the master, this has the problem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This paragraph is going to look out of date real soon. I would go straight for:

Previously the workers used an HTTP long poll mechanism to get updates from the master, which had the problem of causing a lot of duplicate work on the server. This TCP protocol replaces those APIs with the aim of increased efficiency.

[or something]

of causing a lot of duplicate work on the server. This TCP protocol aims to
solve.

Overview
--------

The protocol is based on fire and forget, line based commands. An example flow
would be (where '>' indicates master->worker and '<' worker->master flows)::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you find some other way of writing "master->worker" so that github doesn't put a linebreak in the middle of "->"


> SERVER example.com
< REPLICATE events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its identity
with the ``SERVER`` command, followed by the client asking to subscribe to the
``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
commands which have the format ``RDATA <stream_name> <token> <row>```, where the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excess `

format of ``<row>`` is defined by the individual streams.

Error reporting happens by either the client or server sending an `ERROR`
command, and usually the connection will be closed.


Since the protocol is a simple line based, its possible to manually connect to
the server using a tool like netcat. A few things should be noted when manually
using the protocol:

* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
be used to get all future updates. The special stream name ``ALL`` can be used
with ``NOW`` to subscribe to all available streams.
* The federation stream is only available if federation sending has been
disabled on the main process.
* The server will only time connections out that have sent a ``PING`` command.
If a ping is sent then the connection will be closed if no further commands
are receieved within 15s. Both the client and server protocol implementations
will send an initial PING on connection and ensure at least one command every
5s is sent (not necessarily ``PING``).
* ``RDATA`` commands *usually* include a numeric token, however if the stream
has multiple rows to replicate per token the server will send multiple
``RDATA`` commands, with all but the last having a token of ``batch``. See
the documentation on ``commands.RdataCommand`` for further details.


Architecture
------------

The basic structure of the protocol is line based, where the initial word of
each line specifies the command. The rest of the line is parsed based on the
command. For example, the `RDATA` command is defined as::

RDATA <stream_name> <token> <row_json>

(Note that `<row_json>` may contains spaces, but cannot contain newlines.)

Blank lines are ignored.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to give a complete list of the commands here, with the command syntax, the direction of transmission, a quick summary and a reference to the section where it is explained in more detail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Keep alives
~~~~~~~~~~~

Both sides are expected to send at least one command every 5s or so, and
should send a ``PING`` command if necessary. If either side do not receive a
command within e.g. 15s then the connection should be closed.

Because the server may be connected to manually using e.g. netcat, the timeouts
aren't enabled until an initial ``PING`` command is seen. Both the client and
server implementations below send a ``PING`` command immediately on connection to
ensure the timeouts are enabled.

This ensures that both sides can quickly realize if the tcp connection has gone
and handle the situation appropriately.


Start up
~~~~~~~~

When a new connection is made, the server:

* Sends a ``SERVER`` command, which includes the identity of the server, allowing
the client to detect if its connected to the expected server
* Sends a ``PING`` command as above, to enable the client to time out connections
promptly.

The client:

* Sends a ``NAME`` command, allowing the server to associate a human friendly
name with the connection. This is optional.
* Sends a ``PING`` as above
* For each stream the client wishes to subscribe to it sends a ``REPLICATE``
with the stream_name and token it wants to subscribe from.
* On receipt of a ``SERVER`` command, checks that the server name matches the
expected server name.


Error handling
~~~~~~~~~~~~~~

If either side detects an error it can send an ``ERROR`` command and close the
connection.

If the client side loses the connection to the server it should reconnect,
following the steps above.


Congestion
~~~~~~~~~~

If the server sends messages faster than the client can consume them the server
will first buffer a (fairly large) number of commands and then disconnect the
client. This ensures that we don't queue up an unbounded number of commands in
memory and gives us a potential oppurtunity to squawk loudly. When/if the client
recovers it can reconnect to the server and ask for missed messages.


Reliability
~~~~~~~~~~~

In general the replication stream should be consisdered an unreliable transport
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consisdered

since e.g. commands are not resent if the connection disappears.

The exception to that are the replication streams, i.e. RDATA commands, since
these include tokens which can be used to restart the stream on connection
errors.

The client should keep track of the token in the last RDATA command received
for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details.


Example
~~~~~~~

An example iteraction is shown below. Each line is prefixed with '>' or '<' to
indicate which side is sending, these are *not* included on the wire::

* connection established *
> SERVER localhost:8823
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *

The ``POSITION`` command sent by the server is used to set the clients position
without needing to send data with the ``RDATA`` command.


An example of a batched set of ``RDATA`` is::

> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]

In this case the client shouldn't advance their caches token until it sees the
the last ``RDATA``.
11 changes: 11 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer

from synapse.util.rlimit import change_resource_limit
Expand Down Expand Up @@ -222,6 +223,16 @@ def start_listening(self):
),
interface=address
)
elif listener["type"] == "replication":
bind_addresses = listener["bind_addresses"]
for address in bind_addresses:
factory = ReplicationStreamProtocolFactory(self)
server_listener = reactor.listenTCP(
listener["port"], factory, interface=address
)
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

Expand Down
40 changes: 25 additions & 15 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,15 @@ def send_device_messages(self, destination):
def get_current_token(self):
return self.pos - 1

def get_replication_rows(self, token, limit, federation_ack=None):
"""
def federation_ack(self, token):
self._clear_queue_before_pos(token)

def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
"""Get rows to be sent over federation between the two tokens

Args:
token (int)
from_token (int)
to_token(int)
limit (int)
federation_ack (int): Optional. The position where the worker is
explicitly acknowledged it has handled. Allows us to drop
Expand All @@ -232,8 +237,8 @@ def get_replication_rows(self, token, limit, federation_ack=None):
# TODO: Handle limit.

# To handle restarts where we wrap around
if token > self.pos:
token = -1
if from_token > self.pos:
from_token = -1

rows = []

Expand All @@ -244,10 +249,11 @@ def get_replication_rows(self, token, limit, federation_ack=None):

# Fetch changed presence
keys = self.presence_changed.keys()
i = keys.bisect_right(token)
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
for pos in keys[i:]
for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
)

Expand All @@ -259,8 +265,9 @@ def get_replication_rows(self, token, limit, federation_ack=None):

# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(token)
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])

for (pos, (destination, edu_key)) in keyed_edus:
rows.append(
Expand All @@ -272,16 +279,18 @@ def get_replication_rows(self, token, limit, federation_ack=None):

# Fetch changed edus
keys = self.edus.keys()
i = keys.bisect_right(token)
edus = set((k, self.edus[k]) for k in keys[i:])
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
edus = set((k, self.edus[k]) for k in keys[i:j])

for (pos, edu) in edus:
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))

# Fetch changed failures
keys = self.failures.keys()
i = keys.bisect_right(token)
failures = set((k, self.failures[k]) for k in keys[i:])
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
failures = set((k, self.failures[k]) for k in keys[i:j])

for (pos, (destination, failure)) in failures:
rows.append((pos, FAILURE_TYPE, ujson.dumps({
Expand All @@ -291,8 +300,9 @@ def get_replication_rows(self, token, limit, federation_ack=None):

# Fetch changed device messages
keys = self.device_messages.keys()
i = keys.bisect_right(token)
device_messages = set((k, self.device_messages[k]) for k in keys[i:])
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])

for (pos, destination) in device_messages:
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
Expand Down
67 changes: 67 additions & 0 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from synapse.storage.presence import UserPresenceState

from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -187,6 +188,7 @@ def __init__(self, hs):
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")

# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
Expand Down Expand Up @@ -508,6 +510,71 @@ def update_external_syncs(self, process_id, syncing_user_ids):
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
self.external_process_to_current_syncs[process_id] = syncing_user_ids

@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing):
"""Update the syncing users for an external process as a delta.

Args:
process_id (str): An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing
"""
with (yield self.external_sync_linearizer.queue(process_id)):
prev_state = yield self.current_state_for_user(user_id)

process_presence = self.external_process_to_current_syncs.setdefault(
process_id, set()
)
time_now_ms = self.clock.time_msec()

updates = []
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
updates.append(prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=time_now_ms,
last_user_sync_ts=time_now_ms,
))
else:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
process_presence.add(user_id)
elif user_id in process_presence:
updates.append(prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
))
process_presence.discard(user_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll hit this if is_syncing and user_id in process_presence. I think it's incorrect.


if updates:
yield self._update_states(updates)

self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use time_now_ms ?


@defer.inlineCallbacks
def update_external_syncs_clear(self, process_id):
"""Marks all users that had been marked as syncing by a given process
as offline.

Used when the process has stopped/disappeared.
"""
with (yield self.external_sync_linearizer.queue(process_id)):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
prev_states = yield self.current_state_for_users(process_presence)
time_now_ms = self.clock.time_msec()

yield self._update_states([
prev_state.copy_and_replace(
last_user_sync_ts=time_now_ms,
)
for prev_state in prev_states.itervalues()
])
self.external_process_last_updated_ms.pop(process_id, None)

@defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ def get_all_typing_updates(self, last_id, current_id):
rows.sort()
return rows

def get_current_token(self):
return self._latest_room_serial


class TypingNotificationEventSource(object):
def __init__(self, hs):
Expand Down
Loading