Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'develop' into anoa/device_list_update_fixes
Browse files Browse the repository at this point in the history
* develop:
  Revert 085ae34
  Add a DUMMY stage to captcha-only registration flow
  Make Prometheus snippet less confusing on the metrics collection doc (#4288)
  Set syslog identifiers in systemd units (#5023)
  Run Black on the tests again (#5170)
  Add AllowEncodedSlashes to apache (#5068)
  remove instructions for jessie installation (#5164)
  Run `black` on per_destination_queue
  Limit the number of EDUs in transactions to 100 as expected by receiver (#5138)
anoadragon453 committed May 10, 2019
2 parents c988c1e + c2bb747 commit 7770494
Showing 67 changed files with 924 additions and 1,248 deletions.
21 changes: 5 additions & 16 deletions INSTALL.md
Original file line number Diff line number Diff line change
@@ -257,9 +257,8 @@ https://github.com/spantaleev/matrix-docker-ansible-deploy
#### Matrix.org packages

Matrix.org provides Debian/Ubuntu packages of the latest stable version of
Synapse via https://packages.matrix.org/debian/. To use them:

For Debian 9 (Stretch), Ubuntu 16.04 (Xenial), and later:
Synapse via https://packages.matrix.org/debian/. They are available for Debian
9 (Stretch), Ubuntu 16.04 (Xenial), and later. To use them:

```
sudo apt install -y lsb-release wget apt-transport-https
@@ -270,26 +269,16 @@ sudo apt update
sudo apt install matrix-synapse-py3
```

For Debian 8 (Jessie):

```
sudo apt install -y lsb-release wget apt-transport-https
sudo wget -O /etc/apt/trusted.gpg.d/matrix-org-archive-keyring.gpg https://packages.matrix.org/debian/matrix-org-archive-keyring.gpg
echo "deb [signed-by=5586CCC0CBBBEFC7A25811ADF473DD4473365DE1] https://packages.matrix.org/debian/ $(lsb_release -cs) main" |
sudo tee /etc/apt/sources.list.d/matrix-org.list
sudo apt update
sudo apt install matrix-synapse-py3
```

The fingerprint of the repository signing key is AAF9AE843A7584B5A3E4CD2BCF45A512DE2DA058.

**Note**: if you followed a previous version of these instructions which
recommended using `apt-key add` to add an old key from
`https://matrix.org/packages/debian/`, you should note that this key has been
revoked. You should remove the old key with `sudo apt-key remove
C35EB17E1EAE708E6603A9B3AD0592FE47F0DF61`, and follow the above instructions to
update your configuration.

The fingerprint of the repository signing key (as shown by `gpg
/usr/share/keyrings/matrix-org-archive-keyring.gpg`) is
`AAF9AE843A7584B5A3E4CD2BCF45A512DE2DA058`.

#### Downstream Debian/Ubuntu packages

3 changes: 3 additions & 0 deletions changelog.d/5023.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Configure the example systemd units to have a log identifier of `matrix-synapse`
instead of the executable name, `python`.
Contributed by Christoph Müller.
1 change: 1 addition & 0 deletions changelog.d/5138.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Limit the number of EDUs in transactions to 100 as expected by synapse. Thanks to @superboum for this work!
1 change: 1 addition & 0 deletions changelog.d/5170.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run `black` on the tests directory.
1 change: 1 addition & 0 deletions contrib/systemd-with-workers/system/[email protected]
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.%i --config-path=/
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
SyslogIdentifier=matrix-synapse-%i

[Install]
WantedBy=matrix-synapse.service
1 change: 1 addition & 0 deletions contrib/systemd-with-workers/system/matrix-synapse.service
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --confi
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
SyslogIdentifier=matrix-synapse

[Install]
WantedBy=matrix.target
2 changes: 1 addition & 1 deletion contrib/systemd/matrix-synapse.service
Original file line number Diff line number Diff line change
@@ -22,10 +22,10 @@ Group=nogroup

WorkingDirectory=/opt/synapse
ExecStart=/opt/synapse/env/bin/python -m synapse.app.homeserver --config-path=/opt/synapse/homeserver.yaml
SyslogIdentifier=matrix-synapse

# adjust the cache factor if necessary
# Environment=SYNAPSE_CACHE_FACTOR=2.0

[Install]
WantedBy=multi-user.target

7 changes: 7 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
matrix-synapse-py3 (0.99.3.2+nmu1) UNRELEASED; urgency=medium

[ Christoph Müller ]
* Configure the systemd units to have a log identifier of `matrix-synapse`

-- Christoph Müller <[email protected]> Wed, 17 Apr 2019 16:17:32 +0200

matrix-synapse-py3 (0.99.3.2) stable; urgency=medium

* New synapse release 0.99.3.2.
1 change: 1 addition & 0 deletions debian/matrix-synapse.service
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --confi
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
SyslogIdentifier=matrix-synapse

[Install]
WantedBy=multi-user.target
5 changes: 4 additions & 1 deletion docs/metrics-howto.rst
Original file line number Diff line number Diff line change
@@ -48,7 +48,10 @@ How to monitor Synapse metrics using Prometheus
- job_name: "synapse"
metrics_path: "/_synapse/metrics"
static_configs:
- targets: ["my.server.here:9092"]
- targets: ["my.server.here:port"]

where ``my.server.here`` is the IP address of Synapse, and ``port`` is the listener port
configured with the ``metrics`` resource.

If your prometheus is older than 1.5.2, you will need to replace
``static_configs`` in the above with ``target_groups``.
2 changes: 2 additions & 0 deletions docs/reverse_proxy.rst
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ Let's assume that we expect clients to connect to our server at
SSLEngine on
ServerName matrix.example.com;

AllowEncodedSlashes NoDecode
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
@@ -77,6 +78,7 @@ Let's assume that we expect clients to connect to our server at
SSLEngine on
ServerName example.com;
AllowEncodedSlashes NoDecode
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
124 changes: 66 additions & 58 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
@@ -33,12 +33,14 @@
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter

# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100

logger = logging.getLogger(__name__)


sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
"synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
)

sent_edus_by_type = Counter(
@@ -58,6 +60,7 @@ class PerDestinationQueue(object):
destination (str): the server_name of the destination that we are managing
transmission for.
"""

def __init__(self, hs, transaction_manager, destination):
self._server_name = hs.hostname
self._clock = hs.get_clock()
@@ -68,17 +71,17 @@ def __init__(self, hs, transaction_manager, destination):
self.transmission_loop_running = False

# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: list[tuple[EventBase, int]]
self._pending_edus = [] # type: list[Edu]
self._pending_pdus = [] # type: list[tuple[EventBase, int]]
self._pending_edus = [] # type: list[Edu]

# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]

# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
self._pending_presence = {} # type: dict[str, UserPresenceState]

# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
@@ -120,9 +123,7 @@ def send_presence(self, states):
Args:
states (iterable[UserPresenceState]): presence to send
"""
self._pending_presence.update({
state.user_id: state for state in states
})
self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()

def queue_read_receipt(self, receipt):
@@ -132,14 +133,9 @@ def queue_read_receipt(self, receipt):
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
"""
self._pending_rrs.setdefault(
receipt.room_id, {},
).setdefault(
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}

def flush_read_receipts_for_room(self, room_id):
# if we don't have any read-receipts for this room, it may be that we've already
@@ -170,10 +166,7 @@ def attempt_new_transaction(self):
# request at which point pending_pdus just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
self._destination
)
logger.debug("TX [%s] Transaction already in progress", self._destination)
return

logger.debug("TX [%s] Starting transaction loop", self._destination)
@@ -197,7 +190,8 @@ def _transaction_transmission_loop(self):
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages()
# We have to keep 2 free slots for presence and rr_edus
yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
)

# BEGIN CRITICAL SECTION
@@ -216,19 +210,9 @@ def _transaction_transmission_loop(self):

pending_edus = []

pending_edus.extend(self._get_rr_edus(force_flush=False))

# We can only include at most 100 EDUs per transactions
pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))

pending_edus.extend(
self._pending_edus_keyed.values()
)

self._pending_edus_keyed = {}

pending_edus.extend(device_message_edus)

# rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
@@ -248,9 +232,23 @@ def _transaction_transmission_loop(self):
)
)

pending_edus.extend(device_message_edus)
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)

if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination, len(pending_pdus))
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)

if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
@@ -259,7 +257,7 @@ def _transaction_transmission_loop(self):

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < 100:
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

# END CRITICAL SECTION
@@ -303,22 +301,25 @@ def _transaction_transmission_loop(self):
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
self._destination, e.code, e,
self._destination,
e.code,
e,
)
except RequestSendFailed as e:
logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
)

for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception(
"TX [%s] Failed to send transaction",
self._destination,
)
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
@@ -346,33 +347,40 @@ def _pop_pending_edus(self, limit):
return pending_edus

@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id
def _get_new_device_messages(self, limit):
last_device_list = self._last_device_list_stream_id
# Will return at most 20 entries
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list, limit=limit - 1,
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.direct_to_device",
edu_type="m.device_list_update",
content=content,
)
for content in contents
for content in results
]

last_device_list = self._last_device_list_stream_id
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list, MAX_EDUS_PER_TRANSACTION
assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"

last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination,
last_device_stream_id,
to_device_stream_id,
limit - len(edus),
)
edus.extend(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.device_list_update",
edu_type="m.direct_to_device",
content=content,
)
for content in results
for content in contents
)

defer.returnValue((edus, stream_id, now_stream_id))
2 changes: 1 addition & 1 deletion synapse/storage/deviceinbox.py
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ def delete_messages_for_device_txn(txn):
defer.returnValue(count)

def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit=100
self, destination, last_stream_id, current_stream_id, limit
):
"""
Args:
4 changes: 2 additions & 2 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ def get_devices_by_remote(self, destination, from_stream_id, limit=100):
)

def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, limit=100
self, txn, destination, from_stream_id, now_stream_id, limit
):
# We retrieve n+1 devices from the list of outbound pokes were n is our
# maximum. We then check if the very last device has the same stream_id as the
@@ -116,7 +116,7 @@ def _get_devices_by_remote_txn(
""" % (limit + 1)
txn.execute(sql, (destination, from_stream_id, now_stream_id, False))

duplicate_updates = [r for r in txn]
duplicate_updates = list(txn)

# Return if there are no updates to send out
if len(duplicate_updates) == 0:
Loading

0 comments on commit 7770494

Please sign in to comment.