Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1095 from matrix-org/erikj/batch_edus
Browse files Browse the repository at this point in the history
Clobber EDUs in send queue
  • Loading branch information
erikjohnston authored Sep 12, 2016
2 parents 4162f82 + 3265def commit 555460a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 20 deletions.
8 changes: 6 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ def send_pdu(self, pdu, destinations):
pdu.event_id
)

def send_presence(self, destination, states):
if destination != self.server_name:
self._transaction_queue.enqueue_presence(destination, states)

@log_function
def send_edu(self, destination, edu_type, content):
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(
origin=self.server_name,
destination=destination,
Expand All @@ -134,7 +138,7 @@ def send_edu(self, destination, edu_type, content):
sent_edus_counter.inc()

# TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu)
self._transaction_queue.enqueue_edu(edu, key=key)
return defer.succeed(None)

@log_function
Expand Down
50 changes: 47 additions & 3 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state
import synapse.metrics

import logging
Expand Down Expand Up @@ -69,13 +70,21 @@ def __init__(self, hs, transport_layer):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}

# Presence needs to be separate as we send single aggragate EDUs
self.pending_presence_by_dest = presence = {}
self.pending_edus_keyed_by_dest = edus_keyed = {}

metrics.register_callback(
"pending_pdus",
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
lambda: sum(map(len, edus.values())),
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
+ sum(map(len, edus_keyed.values()))
),
)

# destination -> list of tuple(failure, deferred)
Expand Down Expand Up @@ -130,13 +139,27 @@ def enqueue_pdu(self, pdu, destinations, order):
self._attempt_new_transaction, destination
)

def enqueue_edu(self, edu):
def enqueue_presence(self, destination, states):
self.pending_presence_by_dest.setdefault(destination, {}).update({
state.user_id: state for state in states
})

preserve_context_over_fn(
self._attempt_new_transaction, destination
)

def enqueue_edu(self, edu, key=None):
destination = edu.destination

if not self.can_send_to(destination):
return

self.pending_edus_by_dest.setdefault(destination, []).append(edu)
if key:
self.pending_edus_keyed_by_dest.setdefault(
destination, {}
)[(edu.edu_type, key)] = edu
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu)

preserve_context_over_fn(
self._attempt_new_transaction, destination
Expand Down Expand Up @@ -190,8 +213,13 @@ def _attempt_new_transaction(self, destination):
while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])

pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)

limiter = yield get_retry_limiter(
destination,
self.clock,
Expand All @@ -203,6 +231,22 @@ def _attempt_new_transaction(self, destination):
)

pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)

if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
Expand Down
20 changes: 5 additions & 15 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,18 +625,8 @@ def _push_to_remotes(self, hosts_to_states):
Args:
hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
"""
now = self.clock.time_msec()
for host, states in hosts_to_states.items():
self.federation.send_edu(
destination=host,
edu_type="m.presence",
content={
"push": [
_format_user_presence_state(state, now)
for state in states
]
}
)
self.federation.send_presence(host, states)

@defer.inlineCallbacks
def incoming_presence(self, origin, content):
Expand Down Expand Up @@ -723,13 +713,13 @@ def get_states(self, target_user_ids, as_event=False):
defer.returnValue([
{
"type": "m.presence",
"content": _format_user_presence_state(state, now),
"content": format_user_presence_state(state, now),
}
for state in updates
])
else:
defer.returnValue([
_format_user_presence_state(state, now) for state in updates
format_user_presence_state(state, now) for state in updates
])

@defer.inlineCallbacks
Expand Down Expand Up @@ -988,7 +978,7 @@ def should_notify(old_state, new_state):
return False


def _format_user_presence_state(state, now):
def format_user_presence_state(state, now):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
"""
Expand Down Expand Up @@ -1101,7 +1091,7 @@ def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
defer.returnValue(([
{
"type": "m.presence",
"content": _format_user_presence_state(s, now),
"content": format_user_presence_state(s, now),
}
for s in updates.values()
if include_offline or s.state != PresenceState.OFFLINE
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _push_remotes(self, receipts):
}
},
},
key=(room_id, receipt_type, user_id),
)

@defer.inlineCallbacks
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def _push_update(self, room_id, user_id, typing):
"user_id": user_id,
"typing": typing,
},
key=(room_id, user_id),
))

yield preserve_context_over_deferred(
Expand Down

0 comments on commit 555460a

Please sign in to comment.