Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split out federation transaction sending to a worker #1635

Merged
merged 22 commits into from
Nov 23, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0e830d3
Rename transaction queue functions to send_*
erikjohnston Nov 16, 2016
daec6fc
Move logic into transaction_queue
erikjohnston Nov 16, 2016
847d5db
Add transaction queue and transport layer to DI
erikjohnston Nov 16, 2016
59ef517
Use new federation_sender DI
erikjohnston Nov 16, 2016
1587b5a
Add initial cut of federation send queue
erikjohnston Nov 4, 2016
ed787cf
Hook up the send queue and create a federation sender worker
erikjohnston Nov 16, 2016
f8ee662
Handle sending events and device messages over federation
erikjohnston Nov 17, 2016
7c9cdb2
Store federation stream positions in the database
erikjohnston Nov 21, 2016
524d61b
Fix tests
erikjohnston Nov 21, 2016
9687e03
Remove explicit calls to send_pdu
erikjohnston Nov 21, 2016
50934ce
Comments
erikjohnston Nov 21, 2016
88d85eb
Add some metrics
erikjohnston Nov 21, 2016
73dc099
Add federation-sender to sytest
erikjohnston Nov 21, 2016
51e8970
Comments
erikjohnston Nov 21, 2016
90565d0
Invalidate retry cache in both directions
erikjohnston Nov 22, 2016
54fed21
Fix tests and flake8
erikjohnston Nov 22, 2016
4c79a63
Explicit federation ack
erikjohnston Nov 23, 2016
4d9b5c6
Comment
erikjohnston Nov 23, 2016
b69f76c
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/sp…
erikjohnston Nov 23, 2016
26072df
Ensure only main or federation_sender process can send federation tra…
erikjohnston Nov 23, 2016
ee5e8d7
Fix tests
erikjohnston Nov 23, 2016
feec718
Shuffle receipt handler around so that worker apps don't need to load it
erikjohnston Nov 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix tests
  • Loading branch information
erikjohnston committed Nov 21, 2016
commit 524d61bf7ef293a56201852aa64a16d5c50abd93
3 changes: 3 additions & 0 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ def send_device_messages(self, destination):
self._attempt_new_transaction, destination
)

def get_current_token(self):
return 0

@defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
# list of (pending_pdu, deferred, order)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol,
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
if keyvalues:
where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:
where = ""

Expand Down Expand Up @@ -750,7 +750,7 @@ def _simple_update_one(self, table, keyvalues, updatevalues,
@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:
where = ""

Expand Down
22 changes: 17 additions & 5 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setUp(self):
event_cache_size=1,
password_providers=[],
)
hs = yield setup_test_homeserver(config=config)
hs = yield setup_test_homeserver(config=config, federation_sender=Mock())

self.as_token = "token1"
self.as_url = "some_url"
Expand Down Expand Up @@ -112,7 +112,7 @@ def setUp(self):
event_cache_size=1,
password_providers=[],
)
hs = yield setup_test_homeserver(config=config)
hs = yield setup_test_homeserver(config=config, federation_sender=Mock())
self.db_pool = hs.get_db_pool()

self.as_list = [
Expand Down Expand Up @@ -443,7 +443,11 @@ def test_unique_works(self):
app_service_config_files=[f1, f2], event_cache_size=1,
password_providers=[]
)
hs = yield setup_test_homeserver(config=config, datastore=Mock())
hs = yield setup_test_homeserver(
config=config,
datastore=Mock(),
federation_sender=Mock()
)

ApplicationServiceStore(hs)

Expand All @@ -456,7 +460,11 @@ def test_duplicate_ids(self):
app_service_config_files=[f1, f2], event_cache_size=1,
password_providers=[]
)
hs = yield setup_test_homeserver(config=config, datastore=Mock())
hs = yield setup_test_homeserver(
config=config,
datastore=Mock(),
federation_sender=Mock()
)

with self.assertRaises(ConfigError) as cm:
ApplicationServiceStore(hs)
Expand All @@ -475,7 +483,11 @@ def test_duplicate_as_tokens(self):
app_service_config_files=[f1, f2], event_cache_size=1,
password_providers=[]
)
hs = yield setup_test_homeserver(config=config, datastore=Mock())
hs = yield setup_test_homeserver(
config=config,
datastore=Mock(),
federation_sender=Mock()
)

with self.assertRaises(ConfigError) as cm:
ApplicationServiceStore(hs)
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
database_engine=create_engine(config.database_config),
get_db_conn=db_pool.get_db_conn,
room_list_handler=object(),
tls_server_context_factory=Mock(),
**kargs
)
hs.setup()
Expand All @@ -79,6 +80,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
version_string="Synapse/tests",
database_engine=create_engine(config.database_config),
room_list_handler=object(),
tls_server_context_factory=Mock(),
**kargs
)

Expand Down