From 0e830d377065b35f7430415288c54e13d9a6f0c6 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 16 Nov 2016 11:32:16 +0000
Subject: [PATCH 01/21] Rename transaction queue functions to send_*

---
 synapse/federation/federation_client.py | 14 +++++++-------
 synapse/federation/transaction_queue.py | 10 +++++-----
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 94e76b1978a6..783ccf12f627 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -111,19 +111,19 @@ def send_pdu(self, pdu, destinations):
 
         sent_pdus_destination_dist.inc_by(len(destinations))
 
-        logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
+        logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
-        self._transaction_queue.enqueue_pdu(pdu, destinations, order)
+        self._transaction_queue.send_pdu(pdu, destinations, order)
 
         logger.debug(
-            "[%s] transaction_layer.enqueue_pdu... done",
+            "[%s] transaction_layer.send_pdu... done",
             pdu.event_id
         )
 
     def send_presence(self, destination, states):
         if destination != self.server_name:
-            self._transaction_queue.enqueue_presence(destination, states)
+            self._transaction_queue.send_presence(destination, states)
 
     @log_function
     def send_edu(self, destination, edu_type, content, key=None):
@@ -136,17 +136,17 @@ def send_edu(self, destination, edu_type, content, key=None):
 
         sent_edus_counter.inc()
 
-        self._transaction_queue.enqueue_edu(edu, key=key)
+        self._transaction_queue.send_edu(edu, key=key)
 
     @log_function
     def send_device_messages(self, destination):
         """Sends the device messages in the local database to the remote
         destination"""
-        self._transaction_queue.enqueue_device_messages(destination)
+        self._transaction_queue.send_device_messages(destination)
 
     @log_function
     def send_failure(self, failure, destination):
-        self._transaction_queue.enqueue_failure(failure, destination)
+        self._transaction_queue.send_failure(failure, destination)
         return defer.succeed(None)
 
     @log_function
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f8ca93e4c37e..e0abe4b40be6 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -115,7 +115,7 @@ def can_send_to(self, destination):
         else:
             return not destination.startswith("localhost")
 
-    def enqueue_pdu(self, pdu, destinations, order):
+    def send_pdu(self, pdu, destinations, order):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
@@ -139,7 +139,7 @@ def enqueue_pdu(self, pdu, destinations, order):
                 self._attempt_new_transaction, destination
             )
 
-    def enqueue_presence(self, destination, states):
+    def send_presence(self, destination, states):
         self.pending_presence_by_dest.setdefault(destination, {}).update({
             state.user_id: state for state in states
         })
@@ -148,7 +148,7 @@ def enqueue_presence(self, destination, states):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_edu(self, edu, key=None):
+    def send_edu(self, edu, key=None):
         destination = edu.destination
 
         if not self.can_send_to(destination):
@@ -165,7 +165,7 @@ def enqueue_edu(self, edu, key=None):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_failure(self, failure, destination):
+    def send_failure(self, failure, destination):
         if destination == self.server_name or destination == "localhost":
             return
 
@@ -180,7 +180,7 @@ def enqueue_failure(self, failure, destination):
             self._attempt_new_transaction, destination
         )
 
-    def enqueue_device_messages(self, destination):
+    def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
             return
 

From daec6fc355517b70c159526e20e739fa09c8e443 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 16 Nov 2016 13:41:21 +0000
Subject: [PATCH 02/21] Move logic into transaction_queue

---
 synapse/federation/federation_client.py | 16 ++--------------
 synapse/federation/replication.py       |  2 --
 synapse/federation/transaction_queue.py | 19 ++++++++++++++++---
 3 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 783ccf12f627..9c69fe511c5f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -106,15 +106,12 @@ def send_pdu(self, pdu, destinations):
             Deferred: Completes when we have successfully processed the PDU
             and replicated it to any interested remote home servers.
         """
-        order = self._order
-        self._order += 1
-
         sent_pdus_destination_dist.inc_by(len(destinations))
 
         logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
-        self._transaction_queue.send_pdu(pdu, destinations, order)
+        self._transaction_queue.send_pdu(pdu, destinations)
 
         logger.debug(
             "[%s] transaction_layer.send_pdu... done",
@@ -127,16 +124,7 @@ def send_presence(self, destination, states):
 
     @log_function
     def send_edu(self, destination, edu_type, content, key=None):
-        edu = Edu(
-            origin=self.server_name,
-            destination=destination,
-            edu_type=edu_type,
-            content=content,
-        )
-
-        sent_edus_counter.inc()
-
-        self._transaction_queue.send_edu(edu, key=key)
+        self._transaction_queue.send_edu(destination, edu_type, content, key=key)
 
     @log_function
     def send_device_messages(self, destination):
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index ea66a5dcbc50..043baef13f9d 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -68,8 +68,6 @@ def __init__(self, hs, transport_layer):
         self.transaction_actions = TransactionActions(self.store)
         self._transaction_queue = TransactionQueue(hs, transport_layer)
 
-        self._order = 0
-
         self.hs = hs
 
         super(ReplicationLayer, self).__init__(hs)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index e0abe4b40be6..69e01d652180 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -95,6 +95,8 @@ def __init__(self, hs, transport_layer):
         # HACK to get unique tx id
         self._next_txn_id = int(self.clock.time_msec())
 
+        self._order = 1
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -115,11 +117,14 @@ def can_send_to(self, destination):
         else:
             return not destination.startswith("localhost")
 
-    def send_pdu(self, pdu, destinations, order):
+    def send_pdu(self, pdu, destinations):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
+        order = self._order
+        self._order += 1
+
         destinations = set(destinations)
         destinations = set(
             dest for dest in destinations if self.can_send_to(dest)
@@ -140,6 +145,9 @@ def send_pdu(self, pdu, destinations, order):
             )
 
     def send_presence(self, destination, states):
+        if not self.can_send_to(destination):
+            return
+
         self.pending_presence_by_dest.setdefault(destination, {}).update({
             state.user_id: state for state in states
         })
@@ -148,8 +156,13 @@ def send_presence(self, destination, states):
             self._attempt_new_transaction, destination
         )
 
-    def send_edu(self, edu, key=None):
-        destination = edu.destination
+    def send_edu(self, destination, edu_type, content, key=None):
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
 
         if not self.can_send_to(destination):
             return

From 847d5db1d1aa30fd6a8166e36fe04e6d94533521 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 16 Nov 2016 14:15:50 +0000
Subject: [PATCH 03/21] Add transaction queue and transport layer to DI

---
 synapse/federation/__init__.py          |  7 +++----
 synapse/federation/federation_client.py |  1 -
 synapse/federation/replication.py       |  4 +---
 synapse/federation/transaction_queue.py |  4 ++--
 synapse/server.py                       | 10 ++++++++++
 5 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 979fdf2431fb..2e32d245ba51 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -17,10 +17,9 @@
 """
 
 from .replication import ReplicationLayer
-from .transport.client import TransportLayerClient
 
 
-def initialize_http_replication(homeserver):
-    transport = TransportLayerClient(homeserver)
+def initialize_http_replication(hs):
+    transport = hs.get_federation_transport_client()
 
-    return ReplicationLayer(homeserver, transport)
+    return ReplicationLayer(hs, transport)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 9c69fe511c5f..0fe21ac8d7df 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -18,7 +18,6 @@
 
 from .federation_base import FederationBase
 from synapse.api.constants import Membership
-from .units import Edu
 
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 043baef13f9d..797c4bedbfe6 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -20,8 +20,6 @@
 from .federation_client import FederationClient
 from .federation_server import FederationServer
 
-from .transaction_queue import TransactionQueue
-
 from .persistence import TransactionActions
 
 import logging
@@ -66,7 +64,7 @@ def __init__(self, hs, transport_layer):
         self._clock = hs.get_clock()
 
         self.transaction_actions = TransactionActions(self.store)
-        self._transaction_queue = TransactionQueue(hs, transport_layer)
+        self._transaction_queue = hs.get_federation_sender()
 
         self.hs = hs
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 69e01d652180..eb504055f84d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -44,13 +44,13 @@ class TransactionQueue(object):
     It batches pending PDUs into single transactions.
     """
 
-    def __init__(self, hs, transport_layer):
+    def __init__(self, hs):
         self.server_name = hs.hostname
 
         self.store = hs.get_datastore()
         self.transaction_actions = TransactionActions(self.store)
 
-        self.transport_layer = transport_layer
+        self.transport_layer = hs.get_federation_transport_client()
 
         self.clock = hs.get_clock()
 
diff --git a/synapse/server.py b/synapse/server.py
index 374124a147fd..faab617b4fe6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,6 +32,8 @@
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
+from synapse.federation.transport.client import TransportLayerClient
+from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler
@@ -124,6 +126,8 @@ def build_DEPENDENCY(self)
         'http_client_context_factory',
         'simple_http_client',
         'media_repository',
+        'federation_transport_client',
+        'federation_sender',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -265,6 +269,12 @@ def build_db_pool(self):
     def build_media_repository(self):
         return MediaRepository(self)
 
+    def build_federation_transport_client(self):
+        return TransportLayerClient(self)
+
+    def build_federation_sender(self):
+        return TransactionQueue(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 

From 59ef517e6bc63b2613f18c9b85356a0f973f5698 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 16 Nov 2016 14:28:03 +0000
Subject: [PATCH 04/21] Use new federation_sender DI

---
 synapse/federation/federation_client.py | 49 -------------------------
 synapse/federation/transaction_queue.py | 10 +++++
 synapse/handlers/devicemessage.py       |  4 +-
 synapse/handlers/federation.py          |  7 ++--
 synapse/handlers/presence.py            | 11 +++---
 synapse/handlers/receipts.py            |  4 +-
 synapse/handlers/typing.py              |  4 +-
 7 files changed, 26 insertions(+), 63 deletions(-)

diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 0fe21ac8d7df..b255709165a7 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -44,10 +44,6 @@
 # synapse.federation.federation_client is a silly name
 metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
 
-sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
-
-sent_edus_counter = metrics.register_counter("sent_edus")
-
 sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
 
 
@@ -91,51 +87,6 @@ def start_get_pdu_cache(self):
 
         self._get_pdu_cache.start()
 
-    @log_function
-    def send_pdu(self, pdu, destinations):
-        """Informs the replication layer about a new PDU generated within the
-        home server that should be transmitted to others.
-
-        TODO: Figure out when we should actually resolve the deferred.
-
-        Args:
-            pdu (Pdu): The new Pdu.
-
-        Returns:
-            Deferred: Completes when we have successfully processed the PDU
-            and replicated it to any interested remote home servers.
-        """
-        sent_pdus_destination_dist.inc_by(len(destinations))
-
-        logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
-
-        # TODO, add errback, etc.
-        self._transaction_queue.send_pdu(pdu, destinations)
-
-        logger.debug(
-            "[%s] transaction_layer.send_pdu... done",
-            pdu.event_id
-        )
-
-    def send_presence(self, destination, states):
-        if destination != self.server_name:
-            self._transaction_queue.send_presence(destination, states)
-
-    @log_function
-    def send_edu(self, destination, edu_type, content, key=None):
-        self._transaction_queue.send_edu(destination, edu_type, content, key=key)
-
-    @log_function
-    def send_device_messages(self, destination):
-        """Sends the device messages in the local database to the remote
-        destination"""
-        self._transaction_queue.send_device_messages(destination)
-
-    @log_function
-    def send_failure(self, failure, destination):
-        self._transaction_queue.send_failure(failure, destination)
-        return defer.succeed(None)
-
     @log_function
     def make_query(self, destination, query_type, args,
                    retry_on_dns_fail=False):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index eb504055f84d..5d4f24437784 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -36,6 +36,12 @@
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
+client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+sent_pdus_destination_dist = client_metrics.register_distribution(
+    "sent_pdu_destinations"
+)
+sent_edus_counter = client_metrics.register_counter("sent_edus")
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -135,6 +141,8 @@ def send_pdu(self, pdu, destinations):
         if not destinations:
             return
 
+        sent_pdus_destination_dist.inc_by(len(destinations))
+
         for destination in destinations:
             self.pending_pdus_by_dest.setdefault(destination, []).append(
                 (pdu, order)
@@ -167,6 +175,8 @@ def send_edu(self, destination, edu_type, content, key=None):
         if not self.can_send_to(destination):
             return
 
+        sent_edus_counter.inc()
+
         if key:
             self.pending_edus_keyed_by_dest.setdefault(
                 destination, {}
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c5368e5df266..f7fad15c620f 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -34,9 +34,9 @@ def __init__(self, hs):
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
-        self.federation.register_edu_handler(
+        hs.get_replication_layer().register_edu_handler(
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2d801bad476e..38592d557706 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -71,6 +71,7 @@ def __init__(self, hs):
 
         self.store = hs.get_datastore()
         self.replication_layer = hs.get_replication_layer()
+        self.federation_sender = hs.get_federation_sender()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -94,7 +95,7 @@ def handle_new_event(self, event, destinations):
             processing.
         """
 
-        return self.replication_layer.send_pdu(event, destinations)
+        return self.federation_sender.send_pdu(event, destinations)
 
     @log_function
     @defer.inlineCallbacks
@@ -847,7 +848,7 @@ def on_send_join_request(self, origin, pdu):
             event.signatures,
         )
 
-        self.replication_layer.send_pdu(new_pdu, destinations)
+        self.federation_sender.send_pdu(new_pdu, destinations)
 
         state_ids = context.prev_state_ids.values()
         auth_chain = yield self.store.get_auth_chain(set(
@@ -1071,7 +1072,7 @@ def on_send_leave_request(self, origin, pdu):
             event.signatures,
         )
 
-        self.replication_layer.send_pdu(new_pdu, destinations)
+        self.federation_sender.send_pdu(new_pdu, destinations)
 
         defer.returnValue(None)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b047ae2250da..1b89dc6274dd 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -91,28 +91,29 @@ def __init__(self, hs):
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.federation = hs.get_replication_layer()
+        self.replication = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
         self.state = hs.get_state_handler()
 
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.federation.register_edu_handler(
+        self.replication.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e536a909d01b..916e80a48e4a 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -33,8 +33,8 @@ def __init__(self, hs):
         self.server_name = hs.config.server_name
         self.store = hs.get_datastore()
         self.hs = hs
-        self.federation = hs.get_replication_layer()
-        self.federation.register_edu_handler(
+        self.federation = hs.get_federation_sender()
+        hs.get_replication_layer().register_edu_handler(
             "m.receipt", self._received_remote_receipt
         )
         self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27ee715ff013..0eea7f8f9c29 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -55,9 +55,9 @@ def __init__(self, hs):
         self.clock = hs.get_clock()
         self.wheel_timer = WheelTimer(bucket_size=5000)
 
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_sender()
 
-        self.federation.register_edu_handler("m.typing", self._recv_edu)
+        hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
 
         hs.get_distributor().observe("user_left_room", self.user_left_room)
 

From 1587b5a0339d485c8b078024269a5d888ac5e652 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 4 Nov 2016 15:35:25 +0000
Subject: [PATCH 05/21] Add initial cut of federation send queue

---
 synapse/federation/send_queue.py | 174 +++++++++++++++++++++++++++++++
 1 file changed, 174 insertions(+)
 create mode 100644 synapse/federation/send_queue.py

diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
new file mode 100644
index 000000000000..3d3c3d98ffd1
--- /dev/null
+++ b/synapse/federation/send_queue.py
@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from blist import sorteddict
+
+
+class FederationRemoteSendQueue(object):
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+
+        # TODO: Add metrics for size of lists below
+
+        self.presence_map = {}
+        self.presence_changed = sorteddict()
+
+        self.keyed_edu = {}
+        self.keyed_edu_changed = sorteddict()
+
+        self.edus = sorteddict()
+
+        self.failures = sorteddict()
+
+        self.pos = 1
+        self.pos_time = sorteddict()
+
+        self.clock.looping_call(self._clear_queue, 30 * 1000)
+
+    def _next_pos(self):
+        pos = self.pos
+        self.pos += 1
+        self.pos_time[self.clock.time_msec()] = pos
+        return pos
+
+    def _clear_queue(self):
+        # TODO measure this function time.
+
+        FIVE_MINUTES_AGO = 5 * 60 * 1000
+        now = self.clock.time_msec()
+
+        keys = self.pos_time.keys()
+        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+        if not keys[:time]:
+            return
+
+        position_to_delete = max(keys[:time])
+        for key in keys[:time]:
+            del self.pos_time[key]
+
+        self._clear_queue_before_pos(position_to_delete)
+
+    def _clear_queue_before_pos(self, position_to_delete):
+        # Delete things out of presence maps
+        keys = self.presence_changed.keys()
+        i = keys.bisect_left(position_to_delete)
+        for key in keys[:i]:
+            del self.presence_changed[key]
+
+        user_ids = set()
+        for _, states in self.presence_changed.values():
+            user_ids.update(s.user_id for s in user_ids)
+
+        to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
+        for user_id in self.to_del:
+            del self.presence_map[user_id]
+
+        # Delete things out of keyed edus
+        keys = self.keyed_edu_changed.keys()
+        i = keys.bisect_left(position_to_delete)
+        for key in keys[:i]:
+            del self.keyed_edu_changed[key]
+
+        live_keys = set()
+        for edu_key in self.keyed_edu_changed.values():
+            live_keys.add(edu_key)
+
+        to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+        for edu_key in to_del:
+            del self.keyed_edu[edu_key]
+
+        # Delete things out of edu map
+        keys = self.edus.keys()
+        i = keys.bisect_left(position_to_delete)
+        for key in keys[:i]:
+            del self.edus[key]
+
+        # Delete things out of failure map
+        keys = self.failures.keys()
+        i = keys.bisect_left(position_to_delete)
+        for key in keys[:i]:
+            del self.failures[key]
+
+    def send_edu(self, edu, key=None):
+        pos = self._next_pos()
+
+        if key:
+            self.keyed_edu[(edu.destination, key)] = edu
+            self.keyed_edu_changed[pos] = (edu.destination, key)
+        else:
+            self.edus[pos] = edu
+
+    def send_presence(self, destination, states):
+        pos = self._next_pos()
+
+        self.presence_map.presence_map.update({
+            state.user_id: state
+            for state in states
+        })
+
+        self.presence_changed[pos] = (destination, [
+            state.user_id for state in states
+        ])
+
+    def send_failure(self, failure, destination):
+        pos = self._next_pos()
+
+        self.failures[pos] = (destination, failure)
+
+    def notify_new_device_message(self, destination):
+        # TODO
+        pass
+
+    def get_replication_rows(self, token):
+        rows = []
+
+        # Fetch changed presence
+        keys = self.presence_changed.keys()
+        i = keys.bisect_right(token)
+        dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:])
+
+        for (key, (dest, user_ids)) in dest_user_ids:
+            for user_id in user_ids:
+                rows.append((key, dest, "p", self.presence_map[user_id]))
+
+        # Fetch changes keyed edus
+        keys = self.keyed_edu_changed.keys()
+        i = keys.bisect_right(token)
+        keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+
+        for (pos, edu_key) in keyed_edus:
+            rows.append((pos, edu_key, "k", self.keyed_edu[edu_key]))
+
+        # Fetch changed edus
+        keys = self.edus.keys()
+        i = keys.bisect_right(token)
+        edus = set((k, self.edus[k]) for k in keys[i:])
+
+        for (pos, edu) in edus:
+            rows.append((pos, edu.destination, "e", edu))
+
+        # Fetch changed failures
+        keys = self.failures.keys()
+        i = keys.bisect_right(token)
+        failures = set((k, self.failures[k]) for k in keys[i:])
+
+        for (pos, (destination, failure)) in failures:
+            rows.append((pos, destination, "f", failure))
+
+        # Sort rows based on pos
+        rows.sort()
+
+        return rows

From ed787cf09edd77e39ad9da0b957359214de85287 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 16 Nov 2016 17:34:44 +0000
Subject: [PATCH 06/21] Hook up the send queue and create a federation sender
 worker

---
 synapse/app/federation_sender.py              | 302 ++++++++++++++++++
 synapse/config/server.py                      |   5 +
 synapse/federation/send_queue.py              |  89 ++++--
 synapse/replication/resource.py               |  24 ++
 .../replication/slave/storage/deviceinbox.py  |   5 +
 .../replication/slave/storage/transactions.py |   3 +
 synapse/server.py                             |   6 +-
 synapse/storage/presence.py                   |   7 +
 8 files changed, 419 insertions(+), 22 deletions(-)
 create mode 100644 synapse/app/federation_sender.py

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 000000000000..7a4fec4a6685
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,302 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedRegistrationStore,
+):
+    pass
+
+
+class FederationSenderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation_sender now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        send_handler = self._get_send_handler()
+
+        def replicate(results):
+            stream = results.get("events")
+            if stream:
+                # max_stream_id = stream["position"]
+                # TODO
+                pass
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update(send_handler.stream_positions())
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                send_handler.process_replication(result)
+                replicate(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+    def _get_send_handler(self):
+        try:
+            return self._send_handler
+        except AttributeError:
+            self._send_handler = FederationSenderHandler(self)
+            return self._send_handler
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation sender", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_sender"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.send_federation:
+        sys.stderr.write(
+            "\nThe send_federation must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``send_federation: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.send_federation = True
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ps = FederationSenderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-sender",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+class FederationSenderHandler(object):
+    def __init__(self, hs):
+        self.federation_sender = hs.get_federation_sender()
+
+        self._latest_room_serial = -1
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def stream_positions(self):
+        # We must update this token from the response of the previous
+        # sync. In particular, the stream id may "reset" back to zero/a low
+        # value which we *must* use for the next replication request.
+        return {"federation": self._latest_room_serial}
+
+    def process_replication(self, result):
+        stream = result.get("federation")
+        if stream:
+            self._latest_room_serial = int(stream["position"])
+
+            presence_to_send = {}
+            keyed_edus = {}
+            edus = {}
+            failures = {}
+
+            for row in stream["rows"]:
+                position, typ, content_js = row
+                content = json.loads(content_js)
+
+                if typ == send_queue.PRESENCE_TYPE:
+                    destination = content["destination"]
+                    state = UserPresenceState.from_dict(content["state"])
+
+                    presence_to_send.setdefault(destination, []).append(state)
+                elif typ == send_queue.KEYED_EDU_TYPE:
+                    key = content["key"]
+                    edu = Edu(**content["edu"])
+
+                    keyed_edus.setdefault(edu.destination, {})[key] = edu
+                elif typ == send_queue.EDU_TYPE:
+                    edu = Edu(**content)
+
+                    edus.setdefault(edu.destination, []).append(edu)
+                elif typ == send_queue.FAILURE_TYPE:
+                    destination = content["destination"]
+                    failure = content["failure"]
+
+                    failures.setdefault(destination, []).append(failure)
+                else:
+                    raise Exception("Unrecognised federation type: %r", typ)
+
+            for destination, states in presence_to_send.items():
+                self.federation_sender.send_presence(destination, states)
+
+            for destination, edu_map in keyed_edus.items():
+                for key, edu in edu_map.items():
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=key,
+                    )
+
+            for destination, edu_list in edus.items():
+                for edu in edu_list:
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=None,
+                    )
+
+            for destination, failure_list in failures.items():
+                for failure in failure_list:
+                    self.federation_sender.send_failure(destination, failure)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ed5417d0c35a..634d8e6fe5d3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,6 +30,11 @@ def read_config(self, config):
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
 
+        # Whether to send federation traffic out in this process. This only
+        # applies to some federation traffic, and so shouldn't be used to
+        # "disable" federation
+        self.send_federation = config.get("send_federation", True)
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3d3c3d98ffd1..d439be050a5b 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -13,11 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from .units import Edu
 
 from blist import sorteddict
+import ujson
+
+
+PRESENCE_TYPE = "p"
+KEYED_EDU_TYPE = "k"
+EDU_TYPE = "e"
+FAILURE_TYPE = "f"
 
 
 class FederationRemoteSendQueue(object):
+
     def __init__(self, hs):
         self.clock = hs.get_clock()
 
@@ -68,12 +77,12 @@ def _clear_queue_before_pos(self, position_to_delete):
         for key in keys[:i]:
             del self.presence_changed[key]
 
-        user_ids = set()
-        for _, states in self.presence_changed.values():
-            user_ids.update(s.user_id for s in user_ids)
+        user_ids = set(
+            user_id for uids in self.presence_changed.values() for _, user_id in uids
+        )
 
         to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
-        for user_id in self.to_del:
+        for user_id in to_del:
             del self.presence_map[user_id]
 
         # Delete things out of keyed edus
@@ -102,47 +111,77 @@ def _clear_queue_before_pos(self, position_to_delete):
         for key in keys[:i]:
             del self.failures[key]
 
-    def send_edu(self, edu, key=None):
+    def send_edu(self, destination, edu_type, content, key=None):
         pos = self._next_pos()
 
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
+
         if key:
-            self.keyed_edu[(edu.destination, key)] = edu
-            self.keyed_edu_changed[pos] = (edu.destination, key)
+            self.keyed_edu[(destination, key)] = edu
+            self.keyed_edu_changed[pos] = (destination, key)
         else:
             self.edus[pos] = edu
 
     def send_presence(self, destination, states):
         pos = self._next_pos()
 
-        self.presence_map.presence_map.update({
+        self.presence_map.update({
             state.user_id: state
             for state in states
         })
 
-        self.presence_changed[pos] = (destination, [
-            state.user_id for state in states
-        ])
+        self.presence_changed[pos] = [
+            (destination, state.user_id) for state in states
+        ]
 
     def send_failure(self, failure, destination):
         pos = self._next_pos()
 
-        self.failures[pos] = (destination, failure)
+        self.failures[pos] = (destination, str(failure))
+
+    def send_pdu(self, pdu, destinations):
+        # This gets sent down a separate path
+        pass
 
     def notify_new_device_message(self, destination):
         # TODO
         pass
 
-    def get_replication_rows(self, token):
+    def get_current_token(self):
+        return self.pos - 1
+
+    def get_replication_rows(self, token, limit):
+        # TODO: Handle limit.
+
+        # To handle restarts where we wrap around
+        if token > self.pos:
+            token = -1
+
         rows = []
 
+        # There should be only one reader, so lets delete everything its
+        # acknowledged its seen.
+        self._clear_queue_before_pos(token)
+
         # Fetch changed presence
         keys = self.presence_changed.keys()
         i = keys.bisect_right(token)
-        dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:])
-
-        for (key, (dest, user_ids)) in dest_user_ids:
-            for user_id in user_ids:
-                rows.append((key, dest, "p", self.presence_map[user_id]))
+        dest_user_ids = set(
+            (pos, dest_user_id)
+            for pos in keys[i:]
+            for dest_user_id in self.presence_changed[pos]
+        )
+
+        for (key, (dest, user_id)) in dest_user_ids:
+            rows.append((key, PRESENCE_TYPE, ujson.dumps({
+                "destination": dest,
+                "state": self.presence_map[user_id].as_dict(),
+            })))
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
@@ -150,7 +189,12 @@ def get_replication_rows(self, token):
         keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
 
         for (pos, edu_key) in keyed_edus:
-            rows.append((pos, edu_key, "k", self.keyed_edu[edu_key]))
+            rows.append(
+                (pos, KEYED_EDU_TYPE, ujson.dumps({
+                    "key": edu_key,
+                    "edu": self.keyed_edu[edu_key].get_dict(),
+                }))
+            )
 
         # Fetch changed edus
         keys = self.edus.keys()
@@ -158,7 +202,7 @@ def get_replication_rows(self, token):
         edus = set((k, self.edus[k]) for k in keys[i:])
 
         for (pos, edu) in edus:
-            rows.append((pos, edu.destination, "e", edu))
+            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict())))
 
         # Fetch changed failures
         keys = self.failures.keys()
@@ -166,7 +210,10 @@ def get_replication_rows(self, token):
         failures = set((k, self.failures[k]) for k in keys[i:])
 
         for (pos, (destination, failure)) in failures:
-            rows.append((pos, destination, "f", failure))
+            rows.append((pos, None, FAILURE_TYPE, ujson.dumps({
+                "destination": destination,
+                "failure": failure,
+            })))
 
         # Sort rows based on pos
         rows.sort()
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 5a14c51d235a..a77312ae349e 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -44,6 +44,7 @@
     ("caches",),
     ("to_device",),
     ("public_rooms",),
+    ("federation",),
 )
 
 
@@ -116,8 +117,10 @@ def __init__(self, hs):
         self.sources = hs.get_event_sources()
         self.presence_handler = hs.get_presence_handler()
         self.typing_handler = hs.get_typing_handler()
+        self.federation_sender = hs.get_federation_sender()
         self.notifier = hs.notifier
         self.clock = hs.get_clock()
+        self.config = hs.get_config()
 
         self.putChild("remove_pushers", PusherResource(hs))
         self.putChild("syncing_users", PresenceResource(hs))
@@ -134,6 +137,7 @@ def current_replication_token(self):
         pushers_token = self.store.get_pushers_stream_token()
         caches_token = self.store.get_cache_stream_token()
         public_rooms_token = self.store.get_current_public_room_stream_id()
+        federation_token = self.federation_sender.get_current_token()
 
         defer.returnValue(_ReplicationToken(
             room_stream_token,
@@ -148,6 +152,7 @@ def current_replication_token(self):
             caches_token,
             int(stream_token.to_device_key),
             int(public_rooms_token),
+            int(federation_token),
         ))
 
     @request_handler()
@@ -202,6 +207,7 @@ def replicate(self, request_streams, limit):
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
         yield self.public_rooms(writer, current_token, limit, request_streams)
+        self.federation(writer, current_token, limit, request_streams)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -465,6 +471,23 @@ def public_rooms(self, writer, current_token, limit, request_streams):
                 "position", "room_id", "visibility"
             ), position=upto_token)
 
+    def federation(self, writer, current_token, limit, request_streams):
+        if self.config.send_federation:
+            return
+
+        current_position = current_token.federation
+
+        federation = request_streams.get("federation")
+
+        if federation is not None and federation != current_position:
+            federation_rows = self.federation_sender.get_replication_rows(
+                federation, limit,
+            )
+            upto_token = _position_from_rows(federation_rows, current_position)
+            writer.write_header_and_rows("federation", federation_rows, (
+                "position", "type", "content",
+            ), position=upto_token)
+
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
@@ -497,6 +520,7 @@ def finish(self):
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
     "events", "presence", "typing", "receipts", "account_data", "backfill",
     "push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
+    "federation",
 ))):
     __slots__ = []
 
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 3bfd5e8213c4..373212d42deb 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -29,9 +29,14 @@ def __init__(self, db_conn, hs):
             "DeviceInboxStreamChangeCache",
             self._device_inbox_id_gen.get_current_token()
         )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache",
+            self._device_inbox_id_gen.get_current_token()
+        )
 
     get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
+    get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
     delete_messages_for_device = DataStore.delete_messages_for_device.__func__
 
     def stream_positions(self):
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 6f2ba98af59a..c459301b7665 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -25,6 +25,9 @@ class TransactionStore(BaseSlavedStore):
     ].orig
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
 
+    def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
+        return []
+
     # For now, don't record the destination rety timings
     def set_destination_retry_timings(*args, **kwargs):
         return defer.succeed(None)
diff --git a/synapse/server.py b/synapse/server.py
index faab617b4fe6..6c57ab3e18f0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,6 +32,7 @@
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
+from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
@@ -273,7 +274,10 @@ def build_federation_transport_client(self):
         return TransportLayerClient(self)
 
     def build_federation_sender(self):
-        return TransactionQueue(self)
+        if self.config.send_federation:
+            return TransactionQueue(self)
+        else:
+            return FederationRemoteSendQueue(self)
 
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 21d0696640a3..7460f98a1fec 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
     status_msg (str): User set status message.
     """
 
+    def as_dict(self):
+        return dict(self._asdict())
+
+    @staticmethod
+    def from_dict(d):
+        return UserPresenceState(**d)
+
     def copy_and_replace(self, **kwargs):
         return self._replace(**kwargs)
 

From f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Thu, 17 Nov 2016 15:46:44 +0000
Subject: [PATCH 07/21] Handle sending events and device messages over
 federation

---
 synapse/app/federation_sender.py              | 31 ++++++++-------
 synapse/federation/send_queue.py              | 38 +++++++++++++++----
 synapse/federation/transaction_queue.py       | 32 ++++++++++++++++
 synapse/handlers/message.py                   | 13 +------
 synapse/notifier.py                           |  2 +
 synapse/replication/resource.py               |  2 +-
 .../replication/slave/storage/deviceinbox.py  | 15 ++++++--
 synapse/replication/slave/storage/events.py   | 11 ++++++
 .../replication/slave/storage/transactions.py |  4 +-
 synapse/storage/deviceinbox.py                | 26 +++++++------
 synapse/storage/prepare_database.py           |  2 +-
 .../delta/39/device_federation_stream_idx.sql | 16 ++++++++
 synapse/storage/stream.py                     | 31 +++++++++++++++
 synapse/util/jsonobject.py                    | 17 +++++++--
 14 files changed, 185 insertions(+), 55 deletions(-)
 create mode 100644 synapse/storage/schema/delta/39/device_federation_stream_idx.sql

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7a4fec4a6685..32113c175cc9 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -127,13 +127,6 @@ def replicate(self):
         replication_url = self.config.worker_replication_url
         send_handler = self._get_send_handler()
 
-        def replicate(results):
-            stream = results.get("events")
-            if stream:
-                # max_stream_id = stream["position"]
-                # TODO
-                pass
-
         while True:
             try:
                 args = store.stream_positions()
@@ -142,7 +135,6 @@ def replicate(results):
                 result = yield http_client.get_json(replication_url, args=args)
                 yield store.process_replication(result)
                 send_handler.process_replication(result)
-                replicate(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
@@ -242,16 +234,17 @@ def stream_positions(self):
         return {"federation": self._latest_room_serial}
 
     def process_replication(self, result):
-        stream = result.get("federation")
-        if stream:
-            self._latest_room_serial = int(stream["position"])
+        fed_stream = result.get("federation")
+        if fed_stream:
+            self._latest_room_serial = int(fed_stream["position"])
 
             presence_to_send = {}
             keyed_edus = {}
             edus = {}
             failures = {}
+            device_destinations = set()
 
-            for row in stream["rows"]:
+            for row in fed_stream["rows"]:
                 position, typ, content_js = row
                 content = json.loads(content_js)
 
@@ -264,7 +257,9 @@ def process_replication(self, result):
                     key = content["key"]
                     edu = Edu(**content["edu"])
 
-                    keyed_edus.setdefault(edu.destination, {})[key] = edu
+                    keyed_edus.setdefault(
+                        edu.destination, {}
+                    )[(edu.destination, tuple(key))] = edu
                 elif typ == send_queue.EDU_TYPE:
                     edu = Edu(**content)
 
@@ -274,6 +269,8 @@ def process_replication(self, result):
                     failure = content["failure"]
 
                     failures.setdefault(destination, []).append(failure)
+                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+                    device_destinations.add(content["destination"])
                 else:
                     raise Exception("Unrecognised federation type: %r", typ)
 
@@ -296,6 +293,14 @@ def process_replication(self, result):
                 for failure in failure_list:
                     self.federation_sender.send_failure(destination, failure)
 
+            for destination in device_destinations:
+                self.federation_sender.send_device_messages(destination)
+
+        event_stream = result.get("events")
+        if event_stream:
+            latest_pos = event_stream["position"]
+            self.federation_sender.notify_new_events(latest_pos)
+
 
 if __name__ == '__main__':
     with LoggingContext("main"):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index d439be050a5b..3fc625c4dde8 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -23,11 +23,13 @@
 KEYED_EDU_TYPE = "k"
 EDU_TYPE = "e"
 FAILURE_TYPE = "f"
+DEVICE_MESSAGE_TYPE = "d"
 
 
 class FederationRemoteSendQueue(object):
 
     def __init__(self, hs):
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
 
         # TODO: Add metrics for size of lists below
@@ -45,6 +47,8 @@ def __init__(self, hs):
         self.pos = 1
         self.pos_time = sorteddict()
 
+        self.device_messages = sorteddict()
+
         self.clock.looping_call(self._clear_queue, 30 * 1000)
 
     def _next_pos(self):
@@ -111,6 +115,15 @@ def _clear_queue_before_pos(self, position_to_delete):
         for key in keys[:i]:
             del self.failures[key]
 
+        # Delete things out of device map
+        keys = self.device_messages.keys()
+        i = keys.bisect_left(position_to_delete)
+        for key in keys[:i]:
+            del self.device_messages[key]
+
+    def notify_new_events(self, current_id):
+        pass
+
     def send_edu(self, destination, edu_type, content, key=None):
         pos = self._next_pos()
 
@@ -122,6 +135,7 @@ def send_edu(self, destination, edu_type, content, key=None):
         )
 
         if key:
+            assert isinstance(key, tuple)
             self.keyed_edu[(destination, key)] = edu
             self.keyed_edu_changed[pos] = (destination, key)
         else:
@@ -148,9 +162,9 @@ def send_pdu(self, pdu, destinations):
         # This gets sent down a separate path
         pass
 
-    def notify_new_device_message(self, destination):
-        # TODO
-        pass
+    def send_device_messages(self, destination):
+        pos = self._next_pos()
+        self.device_messages[pos] = destination
 
     def get_current_token(self):
         return self.pos - 1
@@ -188,11 +202,11 @@ def get_replication_rows(self, token, limit):
         i = keys.bisect_right(token)
         keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
 
-        for (pos, edu_key) in keyed_edus:
+        for (pos, (destination, edu_key)) in keyed_edus:
             rows.append(
                 (pos, KEYED_EDU_TYPE, ujson.dumps({
                     "key": edu_key,
-                    "edu": self.keyed_edu[edu_key].get_dict(),
+                    "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
                 }))
             )
 
@@ -202,7 +216,7 @@ def get_replication_rows(self, token, limit):
         edus = set((k, self.edus[k]) for k in keys[i:])
 
         for (pos, edu) in edus:
-            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict())))
+            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
 
         # Fetch changed failures
         keys = self.failures.keys()
@@ -210,11 +224,21 @@ def get_replication_rows(self, token, limit):
         failures = set((k, self.failures[k]) for k in keys[i:])
 
         for (pos, (destination, failure)) in failures:
-            rows.append((pos, None, FAILURE_TYPE, ujson.dumps({
+            rows.append((pos, FAILURE_TYPE, ujson.dumps({
                 "destination": destination,
                 "failure": failure,
             })))
 
+        # Fetch changed device messages
+        keys = self.device_messages.keys()
+        i = keys.bisect_right(token)
+        device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+
+        for (pos, destination) in device_messages:
+            rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+                "destination": destination,
+            })))
+
         # Sort rows based on pos
         rows.sort()
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5d4f24437784..aa664beead1d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,7 @@
     get_retry_limiter, NotRetryingDestination,
 )
 from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id
 from synapse.handlers.presence import format_user_presence_state
 import synapse.metrics
 
@@ -54,6 +55,7 @@ def __init__(self, hs):
         self.server_name = hs.hostname
 
         self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
         self.transaction_actions = TransactionActions(self.store)
 
         self.transport_layer = hs.get_federation_transport_client()
@@ -103,6 +105,9 @@ def __init__(self, hs):
 
         self._order = 1
 
+        self._is_processing = False
+        self._last_token = 0
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -123,6 +128,33 @@ def can_send_to(self, destination):
         else:
             return not destination.startswith("localhost")
 
+    @defer.inlineCallbacks
+    def notify_new_events(self, current_id):
+        if self._is_processing:
+            return
+
+        try:
+            self._is_processing = True
+            while True:
+                self._last_token, events = yield self.store.get_all_new_events_stream(
+                    self._last_token, current_id, limit=20,
+                )
+
+                if not events:
+                    break
+
+                for event in events:
+                    users_in_room = yield self.state.get_current_user_in_room(
+                        event.room_id, latest_event_ids=[event.event_id],
+                    )
+
+                    destinations = [
+                        get_domain_from_id(user_id) for user_id in users_in_room
+                    ]
+                    self.send_pdu(event, destinations)
+        finally:
+            self._is_processing = False
+
     def send_pdu(self, pdu, destinations):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81df45177a15..fd09397226f8 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@
 from synapse.events.validator import EventValidator
 from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken, get_domain_from_id
+    UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock
 from synapse.util.logcontext import preserve_fn
@@ -599,13 +599,6 @@ def is_inviter_member_event(e):
             event_stream_id, max_stream_id
         )
 
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = [
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        ]
-
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
@@ -618,7 +611,3 @@ def _notify():
 
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
-
-        preserve_fn(federation_handler.handle_new_event)(
-            event, destinations=destinations,
-        )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48653ae843c2..d528d1c1e0f0 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -143,6 +143,7 @@ def __init__(self, hs):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
+        self.federation_sender = hs.get_federation_sender()
         self.state_handler = hs.get_state_handler()
 
         self.clock.looping_call(
@@ -219,6 +220,7 @@ def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         self.appservice_handler.notify_interested_services(room_stream_id)
+        self.federation_sender.notify_new_events(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index a77312ae349e..e708811326d2 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -453,7 +453,7 @@ def to_device(self, writer, current_token, limit, request_streams):
             )
             upto_token = _position_from_rows(to_device_rows, current_position)
             writer.write_header_and_rows("to_device", to_device_rows, (
-                "position", "user_id", "device_id", "message_json"
+                "position", "entity",
             ), position=upto_token)
 
     @defer.inlineCallbacks
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 373212d42deb..cc860f9f9b04 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -38,6 +38,7 @@ def __init__(self, db_conn, hs):
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
     get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
     delete_messages_for_device = DataStore.delete_messages_for_device.__func__
+    delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
 
     def stream_positions(self):
         result = super(SlavedDeviceInboxStore, self).stream_positions()
@@ -50,9 +51,15 @@ def process_replication(self, result):
             self._device_inbox_id_gen.advance(int(stream["position"]))
             for row in stream["rows"]:
                 stream_id = row[0]
-                user_id = row[1]
-                self._device_inbox_stream_cache.entity_has_changed(
-                    user_id, stream_id
-                )
+                entity = row[1]
+
+                if entity.startswith("@"):
+                    self._device_inbox_stream_cache.entity_has_changed(
+                        entity, stream_id
+                    )
+                else:
+                    self._device_federation_outbox_stream_cache.entity_has_changed(
+                        entity, stream_id
+                    )
 
         return super(SlavedDeviceInboxStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 0c26e96e985a..ef8713b55da4 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -26,6 +26,11 @@
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 import ujson as json
+import logging
+
+
+logger = logging.getLogger(__name__)
+
 
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
@@ -180,6 +185,8 @@ def __init__(self, db_conn, hs):
         EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
     )
 
+    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
@@ -194,6 +201,10 @@ def process_replication(self, result):
         stream = result.get("events")
         if stream:
             self._stream_id_gen.advance(int(stream["position"]))
+
+            if stream["rows"]:
+                logger.info("Got %d event rows", len(stream["rows"]))
+
             for row in stream["rows"]:
                 self._process_replication_row(
                     row, backfilled=False, state_resets=state_resets
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index c459301b7665..d92cea4ab1e0 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -25,8 +25,8 @@ class TransactionStore(BaseSlavedStore):
     ].orig
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
 
-    def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
-        return []
+    prep_send_transaction = DataStore.prep_send_transaction.__func__
+    delivered_txn = DataStore.delivered_txn.__func__
 
     # For now, don't record the destination rety timings
     def set_destination_retry_timings(*args, **kwargs):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e7371460..87398d60bc7b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ def get_all_new_device_messages(self, last_pos, current_pos, limit):
             return defer.succeed([])
 
         def get_all_new_device_messages_txn(txn):
+            # We limit like this as we might have multiple rows per stream_id, and
+            # we want to make sure we always get all entries for any stream_id
+            # we return.
+            upper_pos = min(current_pos, last_pos + limit)
             sql = (
-                "SELECT stream_id FROM device_inbox"
+                "SELECT stream_id, user_id"
+                " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " GROUP BY stream_id"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_pos, current_pos, limit))
-            stream_ids = txn.fetchall()
-            if not stream_ids:
-                return []
-            max_stream_id_in_limit = stream_ids[-1]
+            txn.execute(sql, (last_pos, upper_pos))
+            rows = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, device_id, message_json"
-                " FROM device_inbox"
+                "SELECT stream_id, destination"
+                " FROM device_federation_outbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
             )
-            txn.execute(sql, (last_pos, max_stream_id_in_limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_pos, upper_pos))
+            rows.extend(txn.fetchall())
+
+            return rows
 
         return self.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6576a3009888..e46ae6502ec8 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 38
+SCHEMA_VERSION = 39
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
new file mode 100644
index 000000000000..00be801e901f
--- /dev/null
+++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35dc5..f34cb78f9af9 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -765,3 +765,34 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim
                 "token": end_token,
             },
         }
+
+    @defer.inlineCallbacks
+    def get_all_new_events_stream(self, from_id, current_id, limit):
+        """Get all new events"""
+
+        def get_all_new_events_stream_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, e.event_id"
+                " FROM events AS e"
+                " WHERE"
+                " ? < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (from_id, current_id, limit))
+            rows = txn.fetchall()
+
+            upper_bound = current_id
+            if len(rows) == limit:
+                upper_bound = rows[-1][0]
+
+            return upper_bound, [row[1] for row in rows]
+
+        upper_bound, event_ids = yield self.runInteraction(
+            "get_all_new_events_stream", get_all_new_events_stream_txn,
+        )
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue((upper_bound, events))
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
index 3fd5c3d9fd1b..d668e5a6b8cb 100644
--- a/synapse/util/jsonobject.py
+++ b/synapse/util/jsonobject.py
@@ -76,15 +76,26 @@ def get_dict(self):
         d.update(self.unrecognized_keys)
         return d
 
+    def get_internal_dict(self):
+        d = {
+            k: _encode(v, internal=True) for (k, v) in self.__dict__.items()
+            if k in self.valid_keys
+        }
+        d.update(self.unrecognized_keys)
+        return d
+
     def __str__(self):
         return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
 
 
-def _encode(obj):
+def _encode(obj, internal=False):
     if type(obj) is list:
-        return [_encode(o) for o in obj]
+        return [_encode(o, internal=internal) for o in obj]
 
     if isinstance(obj, JsonEncodedObject):
-        return obj.get_dict()
+        if internal:
+            return obj.get_internal_dict()
+        else:
+            return obj.get_dict()
 
     return obj

From 7c9cdb22453d1a442e5c280149aeeff4d46da215 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 11:28:37 +0000
Subject: [PATCH 08/21] Store federation stream positions in the database

---
 synapse/app/federation_sender.py              | 38 +++++++++++--------
 synapse/federation/transaction_queue.py       | 21 ++++++++--
 synapse/replication/slave/storage/events.py   |  3 ++
 synapse/storage/_base.py                      | 18 +++++++--
 .../delta/39/federation_out_position.sql      | 22 +++++++++++
 synapse/storage/stream.py                     | 16 ++++++++
 6 files changed, 94 insertions(+), 24 deletions(-)
 create mode 100644 synapse/storage/schema/delta/39/federation_out_position.sql

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 32113c175cc9..6678667c35e5 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,27 +125,22 @@ def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
         replication_url = self.config.worker_replication_url
-        send_handler = self._get_send_handler()
+        send_handler = FederationSenderHandler(self)
+
+        send_handler.on_start()
 
         while True:
             try:
                 args = store.stream_positions()
-                args.update(send_handler.stream_positions())
+                args.update((yield send_handler.stream_positions()))
                 args["timeout"] = 30000
                 result = yield http_client.get_json(replication_url, args=args)
                 yield store.process_replication(result)
-                send_handler.process_replication(result)
+                yield send_handler.process_replication(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
 
-    def _get_send_handler(self):
-        try:
-            return self._send_handler
-        except AttributeError:
-            self._send_handler = FederationSenderHandler(self)
-            return self._send_handler
-
 
 def start(config_options):
     try:
@@ -221,22 +216,29 @@ def start():
 
 class FederationSenderHandler(object):
     def __init__(self, hs):
+        self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
 
-        self._latest_room_serial = -1
         self._room_serials = {}
         self._room_typing = {}
 
+    def on_start(self):
+        # There may be some events that are persisted but haven't been sent,
+        # so send them now.
+        self.federation_sender.notify_new_events(
+            self.store.get_room_max_stream_ordering()
+        )
+
+    @defer.inlineCallbacks
     def stream_positions(self):
-        # We must update this token from the response of the previous
-        # sync. In particular, the stream id may "reset" back to zero/a low
-        # value which we *must* use for the next replication request.
-        return {"federation": self._latest_room_serial}
+        stream_id = yield self.store.get_federation_out_pos("federation")
+        defer.returnValue({"federation": stream_id})
 
+    @defer.inlineCallbacks
     def process_replication(self, result):
         fed_stream = result.get("federation")
         if fed_stream:
-            self._latest_room_serial = int(fed_stream["position"])
+            latest_id = int(fed_stream["position"])
 
             presence_to_send = {}
             keyed_edus = {}
@@ -296,6 +298,10 @@ def process_replication(self, result):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            yield self.store.update_federation_out_pos(
+                "federation", latest_id
+            )
+
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index aa664beead1d..1b0ea070c29f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -106,7 +106,7 @@ def __init__(self, hs):
         self._order = 1
 
         self._is_processing = False
-        self._last_token = 0
+        self._last_poked_id = -1
 
     def can_send_to(self, destination):
         """Can we send messages to the given server?
@@ -130,17 +130,22 @@ def can_send_to(self, destination):
 
     @defer.inlineCallbacks
     def notify_new_events(self, current_id):
+        self._last_poked_id = max(current_id, self._last_poked_id)
+
         if self._is_processing:
             return
 
         try:
             self._is_processing = True
             while True:
-                self._last_token, events = yield self.store.get_all_new_events_stream(
-                    self._last_token, current_id, limit=20,
+                last_token = yield self.store.get_federation_out_pos("events")
+                next_token, events = yield self.store.get_all_new_events_stream(
+                    last_token, self._last_poked_id, limit=20,
                 )
 
-                if not events:
+                logger.debug("Handling %s -> %s", last_token, next_token)
+
+                if not events and next_token >= self._last_poked_id:
                     break
 
                 for event in events:
@@ -151,7 +156,15 @@ def notify_new_events(self, current_id):
                     destinations = [
                         get_domain_from_id(user_id) for user_id in users_in_room
                     ]
+
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     self.send_pdu(event, destinations)
+
+                yield self.store.update_federation_out_pos(
+                    "events", next_token
+                )
+
         finally:
             self._is_processing = False
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index ef8713b55da4..64f18bbb3e2e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -187,6 +187,9 @@ def __init__(self, db_conn, hs):
 
     get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
 
+    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
+    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d8f..d3686b9690d7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol,
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+            "SELECT %(retcol)s FROM %(table)s %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+            "where": where,
         }
 
         txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ def _simple_update_one(self, table, keyvalues, updatevalues,
 
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
-        update_sql = "UPDATE %s SET %s WHERE %s" % (
+        if keyvalues:
+            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+        else:
+            where = ""
+
+        update_sql = "UPDATE %s SET %s %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in updatevalues),
-            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+            where,
         )
 
         txn.execute(
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 000000000000..edbd8e132ff4
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+     type TEXT NOT NULL,
+     stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index f34cb78f9af9..7fa63b58a72d 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -796,3 +796,19 @@ def get_all_new_events_stream_txn(txn):
         events = yield self._get_events(event_ids)
 
         defer.returnValue((upper_bound, events))
+
+    def get_federation_out_pos(self, typ):
+        return self._simple_select_one_onecol(
+            table="federation_stream_position",
+            retcol="stream_id",
+            keyvalues={"type": typ},
+            desc="get_federation_out_pos"
+        )
+
+    def update_federation_out_pos(self, typ, stream_id):
+        return self._simple_update_one(
+            table="federation_stream_position",
+            keyvalues={"type": typ},
+            updatevalues={"stream_id": stream_id},
+            desc="update_federation_out_pos",
+        )

From 524d61bf7ef293a56201852aa64a16d5c50abd93 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 11:53:02 +0000
Subject: [PATCH 09/21] Fix tests

---
 synapse/federation/transaction_queue.py |  3 +++
 synapse/storage/_base.py                |  4 ++--
 tests/storage/test_appservice.py        | 22 +++++++++++++++++-----
 tests/utils.py                          |  2 ++
 4 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1b0ea070c29f..c864e1228770 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -259,6 +259,9 @@ def send_device_messages(self, destination):
             self._attempt_new_transaction, destination
         )
 
+    def get_current_token(self):
+        return 0
+
     @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
         # list of (pending_pdu, deferred, order)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d3686b9690d7..b62c459d8b03 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -562,7 +562,7 @@ def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol,
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
         if keyvalues:
-            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
         else:
             where = ""
 
@@ -750,7 +750,7 @@ def _simple_update_one(self, table, keyvalues, updatevalues,
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
         else:
             where = ""
 
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 02a67b733d17..9ff1abcd8006 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -39,7 +39,7 @@ def setUp(self):
             event_cache_size=1,
             password_providers=[],
         )
-        hs = yield setup_test_homeserver(config=config)
+        hs = yield setup_test_homeserver(config=config, federation_sender=Mock())
 
         self.as_token = "token1"
         self.as_url = "some_url"
@@ -112,7 +112,7 @@ def setUp(self):
             event_cache_size=1,
             password_providers=[],
         )
-        hs = yield setup_test_homeserver(config=config)
+        hs = yield setup_test_homeserver(config=config, federation_sender=Mock())
         self.db_pool = hs.get_db_pool()
 
         self.as_list = [
@@ -443,7 +443,11 @@ def test_unique_works(self):
             app_service_config_files=[f1, f2], event_cache_size=1,
             password_providers=[]
         )
-        hs = yield setup_test_homeserver(config=config, datastore=Mock())
+        hs = yield setup_test_homeserver(
+            config=config,
+            datastore=Mock(),
+            federation_sender=Mock()
+        )
 
         ApplicationServiceStore(hs)
 
@@ -456,7 +460,11 @@ def test_duplicate_ids(self):
             app_service_config_files=[f1, f2], event_cache_size=1,
             password_providers=[]
         )
-        hs = yield setup_test_homeserver(config=config, datastore=Mock())
+        hs = yield setup_test_homeserver(
+            config=config,
+            datastore=Mock(),
+            federation_sender=Mock()
+        )
 
         with self.assertRaises(ConfigError) as cm:
             ApplicationServiceStore(hs)
@@ -475,7 +483,11 @@ def test_duplicate_as_tokens(self):
             app_service_config_files=[f1, f2], event_cache_size=1,
             password_providers=[]
         )
-        hs = yield setup_test_homeserver(config=config, datastore=Mock())
+        hs = yield setup_test_homeserver(
+            config=config,
+            datastore=Mock(),
+            federation_sender=Mock()
+        )
 
         with self.assertRaises(ConfigError) as cm:
             ApplicationServiceStore(hs)
diff --git a/tests/utils.py b/tests/utils.py
index 5929f1c729b7..bf6449a0fc57 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -70,6 +70,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
             database_engine=create_engine(config.database_config),
             get_db_conn=db_pool.get_db_conn,
             room_list_handler=object(),
+            tls_server_context_factory=Mock(),
             **kargs
         )
         hs.setup()
@@ -79,6 +80,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
             version_string="Synapse/tests",
             database_engine=create_engine(config.database_config),
             room_list_handler=object(),
+            tls_server_context_factory=Mock(),
             **kargs
         )
 

From 9687e039e7cbbf64d52d6a530883f913e09ffcf7 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 14:48:51 +0000
Subject: [PATCH 10/21] Remove explicit calls to send_pdu

---
 synapse/federation/send_queue.py        |  4 --
 synapse/federation/transaction_queue.py | 13 ++++--
 synapse/handlers/federation.py          | 53 -------------------------
 3 files changed, 9 insertions(+), 61 deletions(-)

diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3fc625c4dde8..99b583578083 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -158,10 +158,6 @@ def send_failure(self, failure, destination):
 
         self.failures[pos] = (destination, str(failure))
 
-    def send_pdu(self, pdu, destinations):
-        # This gets sent down a separate path
-        pass
-
     def send_device_messages(self, destination):
         pos = self._next_pos()
         self.device_messages[pos] = destination
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c864e1228770..0b3fdc10671f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@
 from .persistence import TransactionActions
 from .units import Transaction, Edu
 
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import preserve_context_over_fn
@@ -153,13 +154,17 @@ def notify_new_events(self, current_id):
                         event.room_id, latest_event_ids=[event.event_id],
                     )
 
-                    destinations = [
+                    destinations = set(
                         get_domain_from_id(user_id) for user_id in users_in_room
-                    ]
+                    )
+
+                    if event.type == EventTypes.Member:
+                        if event.content["membership"] == Membership.JOIN:
+                            destinations.add(get_domain_from_id(event.state_key))
 
                     logger.debug("Sending %s to %r", event, destinations)
 
-                    self.send_pdu(event, destinations)
+                    self._send_pdu(event, destinations)
 
                 yield self.store.update_federation_out_pos(
                     "events", next_token
@@ -168,7 +173,7 @@ def notify_new_events(self, current_id):
         finally:
             self._is_processing = False
 
-    def send_pdu(self, pdu, destinations):
+    def _send_pdu(self, pdu, destinations):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 38592d557706..4ca563c85e75 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -81,22 +81,6 @@ def __init__(self, hs):
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
 
-    def handle_new_event(self, event, destinations):
-        """ Takes in an event from the client to server side, that has already
-        been authed and handled by the state module, and sends it to any
-        remote home servers that may be interested.
-
-        Args:
-            event: The event to send
-            destinations: A list of destinations to send it to
-
-        Returns:
-            Deferred: Resolved when it has successfully been queued for
-            processing.
-        """
-
-        return self.federation_sender.send_pdu(event, destinations)
-
     @log_function
     @defer.inlineCallbacks
     def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@@ -831,25 +815,6 @@ def on_send_join_request(self, origin, pdu):
                 user = UserID.from_string(event.state_key)
                 yield user_joined_room(self.distributor, user, event.room_id)
 
-        new_pdu = event
-
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = set(
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        )
-
-        destinations.discard(origin)
-
-        logger.debug(
-            "on_send_join_request: Sending event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        self.federation_sender.send_pdu(new_pdu, destinations)
-
         state_ids = context.prev_state_ids.values()
         auth_chain = yield self.store.get_auth_chain(set(
             [event.event_id] + state_ids
@@ -1056,24 +1021,6 @@ def on_send_leave_request(self, origin, pdu):
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        new_pdu = event
-
-        users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
-        destinations = set(
-            get_domain_from_id(user_id) for user_id in users_in_room
-            if not self.hs.is_mine_id(user_id)
-        )
-        destinations.discard(origin)
-
-        logger.debug(
-            "on_send_leave_request: Sending event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        self.federation_sender.send_pdu(new_pdu, destinations)
-
         defer.returnValue(None)
 
     @defer.inlineCallbacks

From 50934ce4604001898707f75179dd748884659f12 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 16:55:23 +0000
Subject: [PATCH 11/21] Comments

---
 synapse/app/federation_sender.py        | 12 ++++++++++++
 synapse/federation/send_queue.py        | 26 +++++++++++++++++++++++++
 synapse/federation/transaction_queue.py |  3 +++
 3 files changed, 41 insertions(+)

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6678667c35e5..ba2b4c2615a3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -215,6 +215,9 @@ def start():
 
 
 class FederationSenderHandler(object):
+    """Processes the replication stream and forwards the appropriate entries
+    to the federation sender.
+    """
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
@@ -236,16 +239,22 @@ def stream_positions(self):
 
     @defer.inlineCallbacks
     def process_replication(self, result):
+        # The federation stream contains things that we want to send out, e.g.
+        # presence, typing, etc.
         fed_stream = result.get("federation")
         if fed_stream:
             latest_id = int(fed_stream["position"])
 
+            # The federation stream containis a bunch of different types of
+            # rows that need to be handled differently. We parse the rows, put
+            # them into the appropriate collection and then send them off.
             presence_to_send = {}
             keyed_edus = {}
             edus = {}
             failures = {}
             device_destinations = set()
 
+            # Parse the rows in the stream
             for row in fed_stream["rows"]:
                 position, typ, content_js = row
                 content = json.loads(content_js)
@@ -276,6 +285,7 @@ def process_replication(self, result):
                 else:
                     raise Exception("Unrecognised federation type: %r", typ)
 
+            # We've finished collecting, send everything off
             for destination, states in presence_to_send.items():
                 self.federation_sender.send_presence(destination, states)
 
@@ -298,10 +308,12 @@ def process_replication(self, result):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
+            # Record where we are in the stream.
             yield self.store.update_federation_out_pos(
                 "federation", latest_id
             )
 
+        # We also need to poke the federation sender when new events happen
         event_stream = result.get("events")
         if event_stream:
             latest_pos = event_stream["position"]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 99b583578083..98cf125cb504 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -13,6 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""A federation sender that forwards things to be sent across replication to
+a worker process.
+
+It assumes there is a single worker process feeding off of it.
+
+Each row in the replication stream consists of a type and some json, where the
+types indicate whether they are presence, or edus, etc.
+
+Ephemeral or non-event data are queued up in-memory. When the worker requests
+updates since a particular point, all in-memory data since before that point is
+dropped. We also expire things in the queue after 5 minutes, to ensure that a
+dead worker doesn't cause the queues to grow limitlessly.
+
+Events are replicated via a separate events stream.
+"""
+
 from .units import Edu
 
 from blist import sorteddict
@@ -27,6 +43,7 @@
 
 
 class FederationRemoteSendQueue(object):
+    """A drop in replacement for TransactionQueue"""
 
     def __init__(self, hs):
         self.server_name = hs.hostname
@@ -58,6 +75,7 @@ def _next_pos(self):
         return pos
 
     def _clear_queue(self):
+        """Clear the queues for anything older than N minutes"""
         # TODO measure this function time.
 
         FIVE_MINUTES_AGO = 5 * 60 * 1000
@@ -75,6 +93,7 @@ def _clear_queue(self):
         self._clear_queue_before_pos(position_to_delete)
 
     def _clear_queue_before_pos(self, position_to_delete):
+        """Clear all the queues from before a given position"""
         # Delete things out of presence maps
         keys = self.presence_changed.keys()
         i = keys.bisect_left(position_to_delete)
@@ -122,9 +141,13 @@ def _clear_queue_before_pos(self, position_to_delete):
             del self.device_messages[key]
 
     def notify_new_events(self, current_id):
+        """As per TransactionQueue"""
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
         pass
 
     def send_edu(self, destination, edu_type, content, key=None):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         edu = Edu(
@@ -142,6 +165,7 @@ def send_edu(self, destination, edu_type, content, key=None):
             self.edus[pos] = edu
 
     def send_presence(self, destination, states):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         self.presence_map.update({
@@ -154,11 +178,13 @@ def send_presence(self, destination, states):
         ]
 
     def send_failure(self, failure, destination):
+        """As per TransactionQueue"""
         pos = self._next_pos()
 
         self.failures[pos] = (destination, str(failure))
 
     def send_device_messages(self, destination):
+        """As per TransactionQueue"""
         pos = self._next_pos()
         self.device_messages[pos] = destination
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 0b3fdc10671f..c94c74a67e60 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -131,6 +131,9 @@ def can_send_to(self, destination):
 
     @defer.inlineCallbacks
     def notify_new_events(self, current_id):
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
         self._last_poked_id = max(current_id, self._last_poked_id)
 
         if self._is_processing:

From 88d85ebae14dd26f411e6ed9ac04f8dc5d5cc326 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 17:34:43 +0000
Subject: [PATCH 12/21] Add some metrics

---
 synapse/federation/send_queue.py | 116 ++++++++++++++++++-------------
 1 file changed, 68 insertions(+), 48 deletions(-)

diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 98cf125cb504..76e4c5cd805c 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,10 +31,16 @@
 
 from .units import Edu
 
+from synapse.util.metrics import Measure
+import synapse.metrics
+
 from blist import sorteddict
 import ujson
 
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+
 PRESENCE_TYPE = "p"
 KEYED_EDU_TYPE = "k"
 EDU_TYPE = "e"
@@ -49,8 +55,6 @@ def __init__(self, hs):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
 
-        # TODO: Add metrics for size of lists below
-
         self.presence_map = {}
         self.presence_changed = sorteddict()
 
@@ -61,10 +65,24 @@ def __init__(self, hs):
 
         self.failures = sorteddict()
 
+        self.device_messages = sorteddict()
+
         self.pos = 1
         self.pos_time = sorteddict()
 
-        self.device_messages = sorteddict()
+        # EVERYTHING IS SAD. In particular, python only makes new scopes when
+        # we make a new function, so we need to make a new function so the inner
+        def register(name, queue):
+            metrics.register_callback(
+                queue_name + "_size",
+                lambda: len(queue),
+            )
+
+        for queue_name in [
+            "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
+            "edus", "failures", "device_messages", "pos_time",
+        ]:
+            register(queue_name, getattr(self, queue_name))
 
         self.clock.looping_call(self._clear_queue, 30 * 1000)
 
@@ -76,7 +94,6 @@ def _next_pos(self):
 
     def _clear_queue(self):
         """Clear the queues for anything older than N minutes"""
-        # TODO measure this function time.
 
         FIVE_MINUTES_AGO = 5 * 60 * 1000
         now = self.clock.time_msec()
@@ -94,51 +111,54 @@ def _clear_queue(self):
 
     def _clear_queue_before_pos(self, position_to_delete):
         """Clear all the queues from before a given position"""
-        # Delete things out of presence maps
-        keys = self.presence_changed.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.presence_changed[key]
-
-        user_ids = set(
-            user_id for uids in self.presence_changed.values() for _, user_id in uids
-        )
-
-        to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
-        for user_id in to_del:
-            del self.presence_map[user_id]
-
-        # Delete things out of keyed edus
-        keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.keyed_edu_changed[key]
-
-        live_keys = set()
-        for edu_key in self.keyed_edu_changed.values():
-            live_keys.add(edu_key)
-
-        to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
-        for edu_key in to_del:
-            del self.keyed_edu[edu_key]
-
-        # Delete things out of edu map
-        keys = self.edus.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.edus[key]
-
-        # Delete things out of failure map
-        keys = self.failures.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.failures[key]
+        with Measure(self.clock, "send_queue._clear"):
+            # Delete things out of presence maps
+            keys = self.presence_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.presence_changed[key]
+
+            user_ids = set(
+                user_id for uids in self.presence_changed.values() for _, user_id in uids
+            )
 
-        # Delete things out of device map
-        keys = self.device_messages.keys()
-        i = keys.bisect_left(position_to_delete)
-        for key in keys[:i]:
-            del self.device_messages[key]
+            to_del = [
+                user_id for user_id in self.presence_map if user_id not in user_ids
+            ]
+            for user_id in to_del:
+                del self.presence_map[user_id]
+
+            # Delete things out of keyed edus
+            keys = self.keyed_edu_changed.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.keyed_edu_changed[key]
+
+            live_keys = set()
+            for edu_key in self.keyed_edu_changed.values():
+                live_keys.add(edu_key)
+
+            to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
+            for edu_key in to_del:
+                del self.keyed_edu[edu_key]
+
+            # Delete things out of edu map
+            keys = self.edus.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.edus[key]
+
+            # Delete things out of failure map
+            keys = self.failures.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.failures[key]
+
+            # Delete things out of device map
+            keys = self.device_messages.keys()
+            i = keys.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.device_messages[key]
 
     def notify_new_events(self, current_id):
         """As per TransactionQueue"""

From 73dc099645f918ae79458995e5e4b28a6980ed8f Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 17:37:59 +0000
Subject: [PATCH 13/21] Add federation-sender to sytest

---
 jenkins-dendron-postgres.sh | 1 +
 1 file changed, 1 insertion(+)

diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index 70edae43285b..55ff31fd18f9 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -22,3 +22,4 @@ export SYNAPSE_CACHE_FACTOR=1
     --federation-reader \
     --client-reader \
     --appservice \
+    --federation-sender \

From 51e89709aa310287f07921392ba09c9cb062bd48 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 21 Nov 2016 17:59:39 +0000
Subject: [PATCH 14/21] Comments

---
 synapse/federation/send_queue.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 76e4c5cd805c..ed2b03fad452 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -72,6 +72,8 @@ def __init__(self, hs):
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
+        # lambda binds to the queue rather than to the name of the queue which
+        # changes. ARGH.
         def register(name, queue):
             metrics.register_callback(
                 queue_name + "_size",

From 90565d015e97a494f516cc6f06596ca5c6d490ec Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Tue, 22 Nov 2016 17:45:44 +0000
Subject: [PATCH 15/21] Invalidate retry cache in both directions

---
 synapse/replication/expire_cache.py           | 60 +++++++++++++++++++
 synapse/replication/resource.py               |  2 +
 synapse/replication/slave/storage/_base.py    | 19 ++++++
 .../replication/slave/storage/transactions.py |  9 +--
 synapse/storage/transactions.py               | 48 +++++++++++----
 synapse/util/retryutils.py                    | 21 ++++---
 6 files changed, 132 insertions(+), 27 deletions(-)
 create mode 100644 synapse/replication/expire_cache.py

diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py
new file mode 100644
index 000000000000..c05a50d7a680
--- /dev/null
+++ b/synapse/replication/expire_cache.py
@@ -0,0 +1,60 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+
+
+class ExpireCacheResource(Resource):
+    """
+    HTTP endpoint for expiring storage caches.
+
+    POST /_synapse/replication/expire_cache HTTP/1.1
+    Content-Type: application/json
+
+    {
+        "invalidate": [
+            {
+                "name": "func_name",
+                "keys": ["key1", "key2"]
+            }
+        ]
+    }
+    """
+
+    def __init__(self, hs):
+        Resource.__init__(self)  # Resource is old-style, so no super()
+
+        self.store = hs.get_datastore()
+        self.version_string = hs.version_string
+        self.clock = hs.get_clock()
+
+    def render_POST(self, request):
+        self._async_render_POST(request)
+        return NOT_DONE_YET
+
+    @request_handler()
+    def _async_render_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        for row in content["invalidate"]:
+            name = row["name"]
+            keys = tuple(row["keys"])
+
+            getattr(self.store, name).invalidate(keys)
+
+        respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index e708811326d2..b05ca62710db 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -17,6 +17,7 @@
 from synapse.http.server import request_handler, finish_request
 from synapse.replication.pusher_resource import PusherResource
 from synapse.replication.presence_resource import PresenceResource
+from synapse.replication.expire_cache import ExpireCacheResource
 from synapse.api.errors import SynapseError
 
 from twisted.web.resource import Resource
@@ -124,6 +125,7 @@ def __init__(self, hs):
 
         self.putChild("remove_pushers", PusherResource(hs))
         self.putChild("syncing_users", PresenceResource(hs))
+        self.putChild("expire_cache", ExpireCacheResource(hs))
 
     def render_GET(self, request):
         self._async_render_GET(request)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f19540d6bbb1..18076e0f3b54 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -34,6 +34,9 @@ def __init__(self, db_conn, hs):
         else:
             self._cache_id_gen = None
 
+        self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
+        self.http_client = hs.get_simple_http_client()
+
     def stream_positions(self):
         pos = {}
         if self._cache_id_gen:
@@ -54,3 +57,19 @@ def process_replication(self, result):
                     logger.info("Got unexpected cache_func: %r", cache_func)
             self._cache_id_gen.advance(int(stream["position"]))
         return defer.succeed(None)
+
+    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        txn.call_after(cache_func.invalidate, keys)
+        txn.call_after(self._send_invalidation_poke, cache_func, keys)
+
+    @defer.inlineCallbacks
+    def _send_invalidation_poke(self, cache_func, keys):
+        try:
+            yield self.http_client.post_json_get_json(self.expire_cache_url, {
+                "invalidate": [{
+                    "name": cache_func.__name__,
+                    "keys": list(keys),
+                }]
+            })
+        except:
+            logger.exception("Failed to poke on expire_cache")
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index d92cea4ab1e0..fbb58f35da0d 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
@@ -22,12 +21,10 @@
 class TransactionStore(BaseSlavedStore):
     get_destination_retry_timings = TransactionStore.__dict__[
         "get_destination_retry_timings"
-    ].orig
+    ]
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
+    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
 
     prep_send_transaction = DataStore.prep_send_transaction.__func__
     delivered_txn = DataStore.delivered_txn.__func__
-
-    # For now, don't record the destination rety timings
-    def set_destination_retry_timings(*args, **kwargs):
-        return defer.succeed(None)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c7840..ee2efb0d36e1 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.util.caches.descriptors import cached
 
 from twisted.internet import defer
@@ -200,25 +201,48 @@ def set_destination_retry_timings(self, destination,
 
     def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-        txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+        self.database_engine.lock_table(txn, "destinations")
 
-        self._simple_upsert_txn(
+        self._invalidate_cache_and_stream(
+            txn, self.get_destination_retry_timings, (destination,)
+        )
+
+        # We need to be careful here as the data may have changed from under us
+        # due to a worker setting the timings.
+
+        prev_row = self._simple_select_one_txn(
             txn,
-            "destinations",
+            table="destinations",
             keyvalues={
                 "destination": destination,
             },
-            values={
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            },
-            insertion_values={
-                "destination": destination,
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            }
+            retcols=("retry_last_ts", "retry_interval"),
+            allow_none=True,
         )
 
+        if not prev_row:
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
+        elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+            self._simple_update_one_txn(
+                txn,
+                "destinations",
+                keyvalues={
+                    "destination": destination,
+                },
+                updatevalues={
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                },
+            )
+
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 49527f4d21da..46ef5a8ec79b 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -121,12 +121,6 @@ def __enter__(self):
         pass
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        def err(failure):
-            logger.exception(
-                "Failed to store set_destination_retry_timings",
-                failure.value
-            )
-
         valid_err_code = False
         if exc_type is not None and issubclass(exc_type, CodeMessageException):
             valid_err_code = 0 <= exc_val.code < 500
@@ -151,6 +145,15 @@ def err(failure):
 
             retry_last_ts = int(self.clock.time_msec())
 
-        self.store.set_destination_retry_timings(
-            self.destination, retry_last_ts, self.retry_interval
-        ).addErrback(err)
+        @defer.inlineCallbacks
+        def store_retry_timings():
+            try:
+                yield self.store.set_destination_retry_timings(
+                    self.destination, retry_last_ts, self.retry_interval
+                )
+            except:
+                logger.exception(
+                    "Failed to store set_destination_retry_timings",
+                )
+
+        store_retry_timings()

From 54fed21c049ba89d71242e8c8fc0133fe703395c Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Tue, 22 Nov 2016 18:18:31 +0000
Subject: [PATCH 16/21] Fix tests and flake8

---
 synapse/storage/transactions.py | 1 -
 tests/utils.py                  | 1 +
 2 files changed, 1 insertion(+), 1 deletion(-)

diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index ee2efb0d36e1..809fdd311f10 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.storage.engines import PostgresEngine
 from synapse.util.caches.descriptors import cached
 
 from twisted.internet import defer
diff --git a/tests/utils.py b/tests/utils.py
index bf6449a0fc57..ab2252d24c49 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -53,6 +53,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.trusted_third_party_id_servers = []
         config.room_invite_state_types = []
         config.password_providers = []
+        config.worker_replication_url = ""
 
     config.use_frozen_dicts = True
     config.database_config = {"name": "sqlite3"}

From 4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 23 Nov 2016 10:40:44 +0000
Subject: [PATCH 17/21] Explicit federation ack

---
 synapse/app/federation_sender.py |  5 ++++-
 synapse/federation/send_queue.py | 13 +++++++++++--
 synapse/replication/resource.py  | 15 ++++++++++-----
 3 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index ba2b4c2615a3..dcdbe79a177a 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -235,7 +235,10 @@ def on_start(self):
     @defer.inlineCallbacks
     def stream_positions(self):
         stream_id = yield self.store.get_federation_out_pos("federation")
-        defer.returnValue({"federation": stream_id})
+        defer.returnValue({
+            "federation": stream_id,
+            "federation_ack": stream_id,
+        })
 
     @defer.inlineCallbacks
     def process_replication(self, result):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index ed2b03fad452..5c9f7a86f0aa 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -213,7 +213,15 @@ def send_device_messages(self, destination):
     def get_current_token(self):
         return self.pos - 1
 
-    def get_replication_rows(self, token, limit):
+    def get_replication_rows(self, token, limit, federation_ack=None):
+        """
+        Args:
+            token (int)
+            limit (int)
+            federation_ack (int): Optional. The position where the worker is
+                explicitly acknowledged it has handled. Allows us to drop
+                data from before that point
+        """
         # TODO: Handle limit.
 
         # To handle restarts where we wrap around
@@ -224,7 +232,8 @@ def get_replication_rows(self, token, limit):
 
         # There should be only one reader, so lets delete everything its
         # acknowledged its seen.
-        self._clear_queue_before_pos(token)
+        if federation_ack:
+            self._clear_queue_before_pos(federation_ack)
 
         # Fetch changed presence
         keys = self.presence_changed.keys()
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index b05ca62710db..cb9697e37878 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -171,8 +171,13 @@ def _async_render_GET(self, request):
         }
         request_streams["streams"] = parse_string(request, "streams")
 
+        federation_ack = parse_integer(request, "federation_ack", None)
+
         def replicate():
-            return self.replicate(request_streams, limit)
+            return self.replicate(
+                request_streams, limit,
+                federation_ack=federation_ack
+            )
 
         writer = yield self.notifier.wait_for_replication(replicate, timeout)
         result = writer.finish()
@@ -190,7 +195,7 @@ def replicate():
         finish_request(request)
 
     @defer.inlineCallbacks
-    def replicate(self, request_streams, limit):
+    def replicate(self, request_streams, limit, federation_ack=None):
         writer = _Writer()
         current_token = yield self.current_replication_token()
         logger.debug("Replicating up to %r", current_token)
@@ -209,7 +214,7 @@ def replicate(self, request_streams, limit):
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
         yield self.public_rooms(writer, current_token, limit, request_streams)
-        self.federation(writer, current_token, limit, request_streams)
+        self.federation(writer, current_token, limit, request_streams, federation_ack)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -473,7 +478,7 @@ def public_rooms(self, writer, current_token, limit, request_streams):
                 "position", "room_id", "visibility"
             ), position=upto_token)
 
-    def federation(self, writer, current_token, limit, request_streams):
+    def federation(self, writer, current_token, limit, request_streams, federation_ack):
         if self.config.send_federation:
             return
 
@@ -483,7 +488,7 @@ def federation(self, writer, current_token, limit, request_streams):
 
         if federation is not None and federation != current_position:
             federation_rows = self.federation_sender.get_replication_rows(
-                federation, limit,
+                federation, limit, federation_ack=federation_ack,
             )
             upto_token = _position_from_rows(federation_rows, current_position)
             writer.write_header_and_rows("federation", federation_rows, (

From 4d9b5c60f958320fb80f968a312eb83cf48258d5 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 23 Nov 2016 11:11:41 +0000
Subject: [PATCH 18/21] Comment

---
 synapse/app/federation_sender.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index dcdbe79a177a..80ea4c8062fd 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -237,6 +237,9 @@ def stream_positions(self):
         stream_id = yield self.store.get_federation_out_pos("federation")
         defer.returnValue({
             "federation": stream_id,
+
+            # Ack stuff we've "processed", this should only be called from
+            # one process.
             "federation_ack": stream_id,
         })
 

From 26072df6af7ca37b8e6e5f340a00e695de5c93d5 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 23 Nov 2016 14:09:47 +0000
Subject: [PATCH 19/21] Ensure only main or federation_sender process can send
 federation traffic

---
 synapse/notifier.py             | 11 +++++++++--
 synapse/replication/resource.py |  2 +-
 synapse/server.py               | 13 +++++++++++--
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/synapse/notifier.py b/synapse/notifier.py
index d528d1c1e0f0..054ca59ad29c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -143,7 +143,12 @@ def __init__(self, hs):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
-        self.federation_sender = hs.get_federation_sender()
+
+        if hs.should_send_federation():
+            self.federation_sender = hs.get_federation_sender()
+        else:
+            self.federation_sender = None
+
         self.state_handler = hs.get_state_handler()
 
         self.clock.looping_call(
@@ -220,7 +225,9 @@ def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         self.appservice_handler.notify_interested_services(room_stream_id)
-        self.federation_sender.notify_new_events(room_stream_id)
+
+        if self.federation_sender:
+            self.federation_sender.notify_new_events(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index cb9697e37878..d79b421cba13 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -460,7 +460,7 @@ def to_device(self, writer, current_token, limit, request_streams):
             )
             upto_token = _position_from_rows(to_device_rows, current_position)
             writer.write_header_and_rows("to_device", to_device_rows, (
-                "position", "entity",
+                "position", "user_id", "device_id", "message_json"
             ), position=upto_token)
 
     @defer.inlineCallbacks
diff --git a/synapse/server.py b/synapse/server.py
index 6c57ab3e18f0..ef75ab434c49 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -274,14 +274,23 @@ def build_federation_transport_client(self):
         return TransportLayerClient(self)
 
     def build_federation_sender(self):
-        if self.config.send_federation:
+        if self.should_send_federation():
             return TransactionQueue(self)
-        else:
+        elif not self.config.worker_app:
             return FederationRemoteSendQueue(self)
+        else:
+            raise Exception("Workers cannot send federation traffic")
 
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
+    def should_send_federation(self):
+        "Should this server be sending federation traffic directly?"
+        return self.config.send_federation and (
+            not self.config.worker_app
+            or self.config.worker_app == "synapse.app.federation_sender"
+        )
+
 
 def _make_dependency_method(depname):
     def _get(hs):

From ee5e8d71acb2a21c85c433246b685af0b5bb6c58 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 23 Nov 2016 14:57:07 +0000
Subject: [PATCH 20/21] Fix tests

---
 tests/utils.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/utils.py b/tests/utils.py
index ab2252d24c49..2d0bd205fdd1 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -54,6 +54,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.room_invite_state_types = []
         config.password_providers = []
         config.worker_replication_url = ""
+        config.worker_app = None
 
     config.use_frozen_dicts = True
     config.database_config = {"name": "sqlite3"}

From feec71826523deb63ca6b43cdcecc8edf8710775 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 23 Nov 2016 15:14:24 +0000
Subject: [PATCH 21/21] Shuffle receipt handler around so that worker apps
 don't need to load it

---
 synapse/federation/replication.py        | 1 -
 synapse/handlers/__init__.py             | 2 --
 synapse/handlers/federation.py           | 1 -
 synapse/handlers/initial_sync.py         | 7 ++++---
 synapse/rest/client/v2_alpha/receipts.py | 2 +-
 synapse/server.py                        | 5 +++++
 tests/replication/test_resource.py       | 2 +-
 7 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 797c4bedbfe6..62d865ec4bee 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -64,7 +64,6 @@ def __init__(self, hs, transport_layer):
         self._clock = hs.get_clock()
 
         self.transaction_actions = TransactionActions(self.store)
-        self._transaction_queue = hs.get_federation_sender()
 
         self.hs = hs
 
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 63d05f25310b..5ad408f5494b 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,7 +24,6 @@
 from .directory import DirectoryHandler
 from .admin import AdminHandler
 from .identity import IdentityHandler
-from .receipts import ReceiptsHandler
 from .search import SearchHandler
 
 
@@ -56,7 +55,6 @@ def __init__(self, hs):
         self.profile_handler = ProfileHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
-        self.receipts_handler = ReceiptsHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
         self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4ca563c85e75..771ab3bc43cb 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -71,7 +71,6 @@ def __init__(self, hs):
 
         self.store = hs.get_datastore()
         self.replication_layer = hs.get_replication_layer()
-        self.federation_sender = hs.get_federation_sender()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fbfa5a028180..e0ade4c164d7 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -372,11 +372,12 @@ def get_presence():
 
         @defer.inlineCallbacks
         def get_receipts():
-            receipts_handler = self.hs.get_handlers().receipts_handler
-            receipts = yield receipts_handler.get_receipts_for_room(
+            receipts = yield self.store.get_linearized_receipts_for_room(
                 room_id,
-                now_token.receipt_key
+                to_key=now_token.receipt_key,
             )
+            if not receipts:
+                receipts = []
             defer.returnValue(receipts)
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 891cef99c684..1fbff2edd8fc 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -36,7 +36,7 @@ def __init__(self, hs):
         super(ReceiptRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
-        self.receipts_handler = hs.get_handlers().receipts_handler
+        self.receipts_handler = hs.get_receipts_handler()
         self.presence_handler = hs.get_presence_handler()
 
     @defer.inlineCallbacks
diff --git a/synapse/server.py b/synapse/server.py
index ef75ab434c49..0bfb4112698c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -47,6 +47,7 @@
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
+from synapse.handlers.receipts import ReceiptsHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
@@ -129,6 +130,7 @@ def build_DEPENDENCY(self)
         'media_repository',
         'federation_transport_client',
         'federation_sender',
+        'receipts_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -281,6 +283,9 @@ def build_federation_sender(self):
         else:
             raise Exception("Workers cannot send federation traffic")
 
+    def build_receipts_handler(self):
+        return ReceiptsHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py
index f406934a62e4..93b9fad0125f 100644
--- a/tests/replication/test_resource.py
+++ b/tests/replication/test_resource.py
@@ -103,7 +103,7 @@ def test_receipts(self):
         room_id = yield self.create_room()
         event_id = yield self.send_text_message(room_id, "Hello, World")
         get = self.get(receipts="-1")
-        yield self.hs.get_handlers().receipts_handler.received_client_receipt(
+        yield self.hs.get_receipts_handler().received_client_receipt(
             room_id, "m.read", self.user_id, event_id
         )
         code, body = yield get