Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Deduplicate joins
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 7, 2016
1 parent 87a3089 commit af03ecf
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 1 deletion.
31 changes: 31 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.async import Linearizer

from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
Expand Down Expand Up @@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler):
def __init__(self, hs):
super(RoomMemberHandler, self).__init__(hs)

self.member_linearizer = Linearizer()

self.clock = hs.get_clock()

self.distributor = hs.get_distributor()
Expand Down Expand Up @@ -182,6 +185,34 @@ def update_membership(
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
):
key = (target, room_id,)

with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
)

defer.returnValue(result)

@defer.inlineCallbacks
def _update_membership(
self,
requester,
target,
room_id,
action,
txn_id=None,
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
):
effective_membership_state = action
if action in ["kick", "unban"]:
Expand Down
42 changes: 42 additions & 0 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from .logcontext import PreserveLoggingContext, preserve_fn
from synapse.util import unwrapFirstError

from contextlib import contextmanager


@defer.inlineCallbacks
def sleep(seconds):
Expand Down Expand Up @@ -137,3 +139,43 @@ def _concurrently_execute_inner():
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError)


@contextmanager
def _trigger_defer_manager(d):
try:
yield
finally:
d.callback(None)


class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
Example:
with (yield linearizer.queue("test_key")):
# do some work.
"""
def __init__(self):
self.key_to_defer = {}

@defer.inlineCallbacks
def queue(self, key):
current_defer = self.key_to_defer.get(key)

new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer

def remove_if_current(_):
d = self.key_to_defer.get(key)
if d is new_defer:
self.key_to_defer.pop(key, None)

new_defer.addBoth(remove_if_current)

yield current_defer

defer.returnValue(_trigger_defer_manager(new_defer))
2 changes: 1 addition & 1 deletion synapse/util/caches/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get(self, key):
return None

def set(self, key, deferred):
result = ObservableDeferred(deferred)
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result

def remove(r):
Expand Down
44 changes: 44 additions & 0 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from tests import unittest

from twisted.internet import defer

from synapse.util.async import Linearizer


class LinearizerTestCase(unittest.TestCase):

@defer.inlineCallbacks
def test_linearizer(self):
linearizer = Linearizer()

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1

d2 = linearizer.queue(key)
self.assertFalse(d2.called)

with cm1:
self.assertFalse(d2.called)

self.assertTrue(d2.called)

with (yield d2):
pass

0 comments on commit af03ecf

Please sign in to comment.