diff --git a/changelog.d/14665.misc b/changelog.d/14665.misc new file mode 100644 index 000000000000..2b7c96143d1f --- /dev/null +++ b/changelog.d/14665.misc @@ -0,0 +1 @@ +Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3398fcaf7d47..04a16bd839ab 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1343,32 +1343,50 @@ async def exchange_third_party_invite( ) EventValidator().validate_builder(builder) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context - ) + # Try 2 times, the first one could fail with PartialStateConflictError + # in send_membership_event, cf comment in except block. + for _ in range(2): + try: + ( + event, + context, + ) = await self.event_creation_handler.create_new_client_event( + builder=builder + ) - EventValidator().validate_new(event, self.config) + event, context = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, context + ) - # We need to tell the transaction queue to send this out, even - # though the sender isn't a local user. - event.internal_metadata.send_on_behalf_of = self.hs.hostname + EventValidator().validate_new(event, self.config) - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context(event) - except AuthError as e: - logger.warning("Denying new third party invite %r because %s", event, e) - raise e + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = self.hs.hostname - await self._check_signature(event, context) + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event + ) + except AuthError as e: + logger.warning( + "Denying new third party invite %r because %s", event, e + ) + raise e - # We retrieve the room member handler here as to not cause a cyclic dependency - member_handler = self.hs.get_room_member_handler() - await member_handler.send_membership_event(None, event, context) + await self._check_signature(event, context) + + # We retrieve the room member handler here as to not cause a cyclic dependency + member_handler = self.hs.get_room_member_handler() + await member_handler.send_membership_event(None, event, context) + + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass else: destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} @@ -1400,28 +1418,43 @@ async def on_exchange_third_party_invite_request( room_version_obj, event_dict ) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context - ) + # Try 2 times, the first one could fail with PartialStateConflictError + # in send_membership_event, cf comment in except block. + for _ in range(2): + try: + ( + event, + context, + ) = await self.event_creation_handler.create_new_client_event( + builder=builder + ) + event, context = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, context + ) - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context(event) - except AuthError as e: - logger.warning("Denying third party invite %r because %s", event, e) - raise e - await self._check_signature(event, context) + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context(event) + except AuthError as e: + logger.warning("Denying third party invite %r because %s", event, e) + raise e + await self._check_signature(event, context) + + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = get_domain_from_id( + event.sender + ) - # We need to tell the transaction queue to send this out, even - # though the sender isn't a local user. - event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) + # We retrieve the room member handler here as to not cause a cyclic dependency + member_handler = self.hs.get_room_member_handler() + await member_handler.send_membership_event(None, event, context) - # We retrieve the room member handler here as to not cause a cyclic dependency - member_handler = self.hs.get_room_member_handler() - await member_handler.send_membership_event(None, event, context) + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass async def add_display_name_to_third_party_invite( self, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5cbe89f4fddf..3556c58e0496 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,7 +37,6 @@ AuthError, Codes, ConsentNotGivenError, - LimitExceededError, NotFoundError, ShadowBanError, SynapseError, @@ -998,60 +997,70 @@ async def create_and_send_nonmember_event( event.internal_metadata.stream_ordering, ) - event, context = await self.create_event( - requester, - event_dict, - txn_id=txn_id, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - outlier=outlier, - historical=historical, - depth=depth, - ) + # Try 2 times, the first one could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + for _ in range(2): + try: + event, context = await self.create_event( + requester, + event_dict, + txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + outlier=outlier, + historical=historical, + depth=depth, + ) - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( - event.sender, - ) + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( + event.sender, + ) - spam_check_result = await self.spam_checker.check_event_for_spam(event) - if spam_check_result != self.spam_checker.NOT_SPAM: - if isinstance(spam_check_result, tuple): - try: - [code, dict] = spam_check_result - raise SynapseError( - 403, - "This message had been rejected as probable spam", - code, - dict, - ) - except ValueError: - logger.error( - "Spam-check module returned invalid error value. Expecting [code, dict], got %s", - spam_check_result, - ) + spam_check_result = await self.spam_checker.check_event_for_spam(event) + if spam_check_result != self.spam_checker.NOT_SPAM: + if isinstance(spam_check_result, tuple): + try: + [code, dict] = spam_check_result + raise SynapseError( + 403, + "This message had been rejected as probable spam", + code, + dict, + ) + except ValueError: + logger.error( + "Spam-check module returned invalid error value. Expecting [code, dict], got %s", + spam_check_result, + ) - raise SynapseError( - 403, - "This message has been rejected as probable spam", - Codes.FORBIDDEN, - ) + raise SynapseError( + 403, + "This message has been rejected as probable spam", + Codes.FORBIDDEN, + ) - # Backwards compatibility: if the return value is not an error code, it - # means the module returned an error message to be included in the - # SynapseError (which is now deprecated). - raise SynapseError( - 403, - spam_check_result, - Codes.FORBIDDEN, + # Backwards compatibility: if the return value is not an error code, it + # means the module returned an error message to be included in the + # SynapseError (which is now deprecated). + raise SynapseError( + 403, + spam_check_result, + Codes.FORBIDDEN, + ) + + ev = await self.handle_new_client_event( + requester=requester, + events_and_context=[(event, context)], + ratelimit=ratelimit, + ignore_shadow_ban=ignore_shadow_ban, ) - ev = await self.handle_new_client_event( - requester=requester, - events_and_context=[(event, context)], - ratelimit=ratelimit, - ignore_shadow_ban=ignore_shadow_ban, - ) + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass # we know it was persisted, so must have a stream ordering assert ev.internal_metadata.stream_ordering @@ -1355,7 +1364,7 @@ async def handle_new_client_event( Raises: ShadowBanError if the requester has been shadow-banned. - SynapseError(503) if attempting to persist a partial state event in + PartialStateConflictError if attempting to persist a partial state event in a room that has been un-partial stated. """ extra_users = extra_users or [] @@ -1417,34 +1426,23 @@ async def handle_new_client_event( # We now persist the event (and update the cache in parallel, since we # don't want to block on it). event, context = events_and_context[0] - try: - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_events, - requester=requester, - events_and_context=events_and_context, - ratelimit=ratelimit, - extra_users=extra_users, - ), - run_in_background( - self.cache_joined_hosts_for_events, events_and_context - ).addErrback( - log_failure, "cache_joined_hosts_for_event failed" - ), + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_events, + requester=requester, + events_and_context=events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ), - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) - except PartialStateConflictError as e: - # The event context needs to be recomputed. - # Turn the error into a 429, as a hint to the client to try again. - logger.info( - "Room %s was un-partial stated while persisting client event.", - event.room_id, + run_in_background( + self.cache_joined_hosts_for_events, events_and_context + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), + ), + consumeErrors=True, ) - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) + ).addErrback(unwrapFirstError) return result @@ -2005,26 +2003,36 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool: for user_id in members: requester = create_requester(user_id, authenticated_entity=self.server_name) try: - event, context = await self.create_event( - requester, - { - "type": EventTypes.Dummy, - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - ) + # Try 2 times, the first one could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + for _ in range(2): + try: + event, context = await self.create_event( + requester, + { + "type": EventTypes.Dummy, + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + ) - event.internal_metadata.proactively_send = False + event.internal_metadata.proactively_send = False - # Since this is a dummy-event it is OK if it is sent by a - # shadow-banned user. - await self.handle_new_client_event( - requester, - events_and_context=[(event, context)], - ratelimit=False, - ignore_shadow_ban=True, - ) + # Since this is a dummy-event it is OK if it is sent by a + # shadow-banned user. + await self.handle_new_client_event( + requester, + events_and_context=[(event, context)], + ratelimit=False, + ignore_shadow_ban=True, + ) + + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass return True except AuthError: logger.info( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6dcfd86fdf36..17dc18f77aa5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -62,6 +62,7 @@ from synapse.handlers.relations import BundledAggregations from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.state import StateFilter from synapse.streams import EventSource from synapse.types import ( @@ -1121,17 +1122,29 @@ async def create_event( ) creation_content.update({"creator": creator_id}) - creation_event, creation_context = await create_event( - EventTypes.Create, creation_content, False - ) logger.debug("Sending %s in new room", EventTypes.Member) - ev = await self.event_creation_handler.handle_new_client_event( - requester=creator, - events_and_context=[(creation_event, creation_context)], - ratelimit=False, - ignore_shadow_ban=True, - ) + + # Try 2 times, the first one could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + for _ in range(2): + try: + creation_event, creation_context = await create_event( + EventTypes.Create, creation_content, False + ) + + ev = await self.event_creation_handler.handle_new_client_event( + requester=creator, + events_and_context=[(creation_event, creation_context)], + ratelimit=False, + ignore_shadow_ban=True, + ) + + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass last_sent_event_id = ev.event_id member_event_id, _ = await self.room_member_handler.update_membership( @@ -1160,123 +1173,139 @@ async def create_event( ) current_state_group = event_to_state[member_event_id] - events_to_send = [] - # We treat the power levels override specially as this needs to be one - # of the first events that get sent into a room. - pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) - if pl_content is not None: - power_event, power_context = await create_event( - EventTypes.PowerLevels, pl_content, True - ) - current_state_group = power_context._state_group - events_to_send.append((power_event, power_context)) - else: - power_level_content: JsonDict = { - "users": {creator_id: 100}, - "users_default": 0, - "events": { - EventTypes.Name: 50, - EventTypes.PowerLevels: 100, - EventTypes.RoomHistoryVisibility: 100, - EventTypes.CanonicalAlias: 50, - EventTypes.RoomAvatar: 50, - EventTypes.Tombstone: 100, - EventTypes.ServerACL: 100, - EventTypes.RoomEncryption: 100, - }, - "events_default": 0, - "state_default": 50, - "ban": 50, - "kick": 50, - "redact": 50, - "invite": 50, - "historical": 100, - } - - if config["original_invitees_have_ops"]: - for invitee in invite_list: - power_level_content["users"][invitee] = 100 - - # If the user supplied a preset name e.g. "private_chat", - # we apply that preset - power_level_content.update(config["power_level_content_override"]) - - # If the server config contains default_power_level_content_override, - # and that contains information for this room preset, apply it. - if self._default_power_level_content_override: - override = self._default_power_level_content_override.get(preset_config) - if override is not None: - power_level_content.update(override) - - # Finally, if the user supplied specific permissions for this room, - # apply those. - if power_level_content_override: - power_level_content.update(power_level_content_override) - pl_event, pl_context = await create_event( - EventTypes.PowerLevels, - power_level_content, - True, - ) - current_state_group = pl_context._state_group - events_to_send.append((pl_event, pl_context)) + # Try 2 times, the first one could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + for _ in range(2): + try: + events_to_send = [] + # We treat the power levels override specially as this needs to be one + # of the first events that get sent into a room. + pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) + if pl_content is not None: + power_event, power_context = await create_event( + EventTypes.PowerLevels, pl_content, True + ) + current_state_group = power_context._state_group + events_to_send.append((power_event, power_context)) + else: + power_level_content: JsonDict = { + "users": {creator_id: 100}, + "users_default": 0, + "events": { + EventTypes.Name: 50, + EventTypes.PowerLevels: 100, + EventTypes.RoomHistoryVisibility: 100, + EventTypes.CanonicalAlias: 50, + EventTypes.RoomAvatar: 50, + EventTypes.Tombstone: 100, + EventTypes.ServerACL: 100, + EventTypes.RoomEncryption: 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 50, + "historical": 100, + } + + if config["original_invitees_have_ops"]: + for invitee in invite_list: + power_level_content["users"][invitee] = 100 + + # If the user supplied a preset name e.g. "private_chat", + # we apply that preset + power_level_content.update(config["power_level_content_override"]) + + # If the server config contains default_power_level_content_override, + # and that contains information for this room preset, apply it. + if self._default_power_level_content_override: + override = self._default_power_level_content_override.get( + preset_config + ) + if override is not None: + power_level_content.update(override) + + # Finally, if the user supplied specific permissions for this room, + # apply those. + if power_level_content_override: + power_level_content.update(power_level_content_override) + pl_event, pl_context = await create_event( + EventTypes.PowerLevels, + power_level_content, + True, + ) + current_state_group = pl_context._state_group + events_to_send.append((pl_event, pl_context)) + + if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: + room_alias_event, room_alias_context = await create_event( + EventTypes.CanonicalAlias, + {"alias": room_alias.to_string()}, + True, + ) + current_state_group = room_alias_context._state_group + events_to_send.append((room_alias_event, room_alias_context)) + + if (EventTypes.JoinRules, "") not in initial_state: + join_rules_event, join_rules_context = await create_event( + EventTypes.JoinRules, + {"join_rule": config["join_rules"]}, + True, + ) + current_state_group = join_rules_context._state_group + events_to_send.append((join_rules_event, join_rules_context)) + + if (EventTypes.RoomHistoryVisibility, "") not in initial_state: + visibility_event, visibility_context = await create_event( + EventTypes.RoomHistoryVisibility, + {"history_visibility": config["history_visibility"]}, + True, + ) + current_state_group = visibility_context._state_group + events_to_send.append((visibility_event, visibility_context)) + + if config["guest_can_join"]: + if (EventTypes.GuestAccess, "") not in initial_state: + guest_access_event, guest_access_context = await create_event( + EventTypes.GuestAccess, + {EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN}, + True, + ) + current_state_group = guest_access_context._state_group + events_to_send.append( + (guest_access_event, guest_access_context) + ) + + for (etype, state_key), content in initial_state.items(): + event, context = await create_event( + etype, content, True, state_key=state_key + ) + current_state_group = context._state_group + events_to_send.append((event, context)) + + if config["encrypted"]: + encryption_event, encryption_context = await create_event( + EventTypes.RoomEncryption, + {"algorithm": RoomEncryptionAlgorithms.DEFAULT}, + True, + state_key="", + ) + events_to_send.append((encryption_event, encryption_context)) - if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: - room_alias_event, room_alias_context = await create_event( - EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True - ) - current_state_group = room_alias_context._state_group - events_to_send.append((room_alias_event, room_alias_context)) - - if (EventTypes.JoinRules, "") not in initial_state: - join_rules_event, join_rules_context = await create_event( - EventTypes.JoinRules, - {"join_rule": config["join_rules"]}, - True, - ) - current_state_group = join_rules_context._state_group - events_to_send.append((join_rules_event, join_rules_context)) - - if (EventTypes.RoomHistoryVisibility, "") not in initial_state: - visibility_event, visibility_context = await create_event( - EventTypes.RoomHistoryVisibility, - {"history_visibility": config["history_visibility"]}, - True, - ) - current_state_group = visibility_context._state_group - events_to_send.append((visibility_event, visibility_context)) - - if config["guest_can_join"]: - if (EventTypes.GuestAccess, "") not in initial_state: - guest_access_event, guest_access_context = await create_event( - EventTypes.GuestAccess, - {EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN}, - True, + last_event = await self.event_creation_handler.handle_new_client_event( + creator, + events_to_send, + ignore_shadow_ban=True, + ratelimit=False, ) - current_state_group = guest_access_context._state_group - events_to_send.append((guest_access_event, guest_access_context)) - - for (etype, state_key), content in initial_state.items(): - event, context = await create_event( - etype, content, True, state_key=state_key - ) - current_state_group = context._state_group - events_to_send.append((event, context)) - - if config["encrypted"]: - encryption_event, encryption_context = await create_event( - EventTypes.RoomEncryption, - {"algorithm": RoomEncryptionAlgorithms.DEFAULT}, - True, - state_key="", - ) - events_to_send.append((encryption_event, encryption_context)) - last_event = await self.event_creation_handler.handle_new_client_event( - creator, - events_to_send, - ignore_shadow_ban=True, - ratelimit=False, - ) + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass assert last_event.internal_metadata.stream_ordering is not None return last_event.internal_metadata.stream_ordering, last_event.event_id, depth diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 411a6fb22fdb..c73d2adaad47 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -375,6 +375,8 @@ async def persist_historical_events( # Events are sorted by (topological_ordering, stream_ordering) # where topological_ordering is just depth. for (event, context) in reversed(events_to_persist): + # This call can't raise `PartialStateConflictError` since we forbid + # use of the historical batch API during partial state await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( event.sender, app_service_requester.app_service diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6ad2b38b8f96..ca35027e4d38 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -34,6 +34,7 @@ from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.logging import opentracing from synapse.module_api import NOT_SPAM +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.state import StateFilter from synapse.types import ( JsonDict, @@ -392,60 +393,78 @@ async def _local_membership_update( event_pos = await self.store.get_position_for_event(existing_event_id) return existing_event_id, event_pos.stream - event, context = await self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": user_id, - # For backwards compatibility: - "membership": membership, - "origin_server_ts": origin_server_ts, - }, - txn_id=txn_id, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - depth=depth, - require_consent=require_consent, - outlier=outlier, - historical=historical, - ) - - prev_state_ids = await context.get_prev_state_ids( - StateFilter.from_types([(EventTypes.Member, None)]) - ) + # Try 2 times, the first one could fail with PartialStateConflictError, + # cf comment in except block. + for _ in range(2): + try: + event, context = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": user_id, + # For backwards compatibility: + "membership": membership, + "origin_server_ts": origin_server_ts, + }, + txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + require_consent=require_consent, + outlier=outlier, + historical=historical, + ) - prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + prev_state_ids = await context.get_prev_state_ids( + StateFilter.from_types([(EventTypes.Member, None)]) + ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False + prev_member_event_id = prev_state_ids.get( + (EventTypes.Member, user_id), None ) - with opentracing.start_active_span("handle_new_client_event"): - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - events_and_context=[(event, context)], - extra_users=[target], - ratelimit=ratelimit, - ) - if event.membership == Membership.LEAVE: - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - if prev_member_event.membership == Membership.JOIN: - await self._user_left_room(target, room_id) + if event.membership == Membership.JOIN: + newly_joined = True + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + newly_joined = prev_member_event.membership != Membership.JOIN + + # Only rate-limit if the user actually joined the room, otherwise we'll end + # up blocking profile updates. + if newly_joined and ratelimit: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + with opentracing.start_active_span("handle_new_client_event"): + result_event = ( + await self.event_creation_handler.handle_new_client_event( + requester, + events_and_context=[(event, context)], + extra_users=[target], + ratelimit=ratelimit, + ) + ) + + if event.membership == Membership.LEAVE: + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + if prev_member_event.membership == Membership.JOIN: + await self._user_left_room(target, room_id) + + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering @@ -1234,6 +1253,8 @@ async def send_membership_event( ratelimit: Whether to rate limit this request. Raises: SynapseError if there was a problem changing the membership. + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ target_user = UserID.from_string(event.state_key) room_id = event.room_id @@ -1863,21 +1884,34 @@ async def _generate_local_out_of_band_leave( list(previous_membership_event.auth_event_ids()) + prev_event_ids ) - event, context = await self.event_creation_handler.create_event( - requester, - event_dict, - txn_id=txn_id, - prev_event_ids=prev_event_ids, - auth_event_ids=auth_event_ids, - outlier=True, - ) - event.internal_metadata.out_of_band_membership = True + # Try 2 times, the first one could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + for _ in range(2): + try: + event, context = await self.event_creation_handler.create_event( + requester, + event_dict, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + outlier=True, + ) + event.internal_metadata.out_of_band_membership = True + + result_event = ( + await self.event_creation_handler.handle_new_client_event( + requester, + events_and_context=[(event, context)], + extra_users=[UserID.from_string(target_user)], + ) + ) + + break + except PartialStateConflictError: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + pass - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - events_and_context=[(event, context)], - extra_users=[UserID.from_string(target_user)], - ) # we know it was persisted, so must have a stream ordering assert result_event.internal_metadata.stream_ordering