diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 7795e7a981d4..e6ff226e5ba6 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -51,8 +51,8 @@ def __init__(self, hs: "HomeServer") -> None: self._reactor = hs.get_reactor() self._store = hs.get_datastores().main self._clock = hs.get_clock() - self._replication_handler = hs.get_replication_command_handler() self._notifier = hs.get_notifier() + self._instance_name = hs.get_instance_name() # Map from lock name/key to set of `WaitingLock` that are active for # that lock. @@ -143,10 +143,11 @@ def notify_lock_released(self, lock_name: str, lock_key: str) -> None: Pokes both the notifier and replication. """ - self._replication_handler.send_lock_released(lock_name, lock_key) - self._notifier.notify_lock_released(lock_name, lock_key) + self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key) - def _on_lock_released(self, lock_name: str, lock_key: str) -> None: + def _on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: """Called when a lock has been released. Wakes up any locks that might be waiting on this. diff --git a/synapse/notifier.py b/synapse/notifier.py index 818aa61cd999..68115bca7061 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -235,7 +235,7 @@ def __init__(self, hs: "HomeServer"): self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules # List of callbacks to be notified when a lock is released - self._lock_released_callback: List[Callable[[str, str], None]] = [] + self._lock_released_callback: List[Callable[[str, str, str], None]] = [] self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() @@ -788,14 +788,18 @@ def notify_remote_server_up(self, server: str) -> None: # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) - def add_lock_released_callback(self, callback: Callable[[str, str], None]) -> None: + def add_lock_released_callback( + self, callback: Callable[[str, str, str], None] + ) -> None: """Add a function to be called whenever we are notified about a released lock.""" self._lock_released_callback.append(callback) - def notify_lock_released(self, lock_name: str, lock_key: str) -> None: + def notify_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: """Notify the callbacks that a lock has been released.""" for cb in self._lock_released_callback: - cb(lock_name, lock_key) + cb(instance_name, lock_name, lock_key) @attr.s(auto_attribs=True) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index fafb9a495fe8..10f5c98ff8a9 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -427,27 +427,29 @@ class LockReleasedCommand(Command): Format:: - LOCK_RELEASED ["", ""] + LOCK_RELEASED ["", "", ""] """ NAME = "LOCK_RELEASED" def __init__( self, + instance_name: str, lock_name: str, lock_key: str, ): + self.instance_name = instance_name self.lock_name = lock_name self.lock_key = lock_key @classmethod def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand": - lock_name, lock_key = json_decoder.decode(line) + instance_name, lock_name, lock_key = json_decoder.decode(line) - return cls(lock_name, lock_key) + return cls(instance_name, lock_name, lock_key) def to_line(self) -> str: - return json_encoder.encode([self.lock_name, self.lock_key]) + return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key]) _COMMANDS: Tuple[Type[Command], ...] = ( diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 022311f800c9..2e1477d54814 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -249,6 +249,8 @@ def __init__(self, hs: "HomeServer"): if self._is_master or self._should_insert_client_ips: self.subscribe_to_channel("USER_IP") + self._notifier.add_lock_released_callback(self.on_lock_released) + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -653,7 +655,12 @@ def on_LOCK_RELEASED( self, conn: IReplicationConnection, cmd: LockReleasedCommand ) -> None: """Called when we get a new LOCK_RELEASED command.""" - self._notifier.notify_lock_released(cmd.lock_name, cmd.lock_key) + if cmd.instance_name == self._instance_name: + return + + self._notifier.notify_lock_released( + cmd.instance_name, cmd.lock_name, cmd.lock_key + ) def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" @@ -761,9 +768,12 @@ def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> No """ self.send_command(RdataCommand(stream_name, self._instance_name, token, data)) - def send_lock_released(self, lock_name: str, lock_key: str) -> None: + def on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: """Called when we released a lock and should notify other instances.""" - self.send_command(LockReleasedCommand(lock_name, lock_key)) + if instance_name == self._instance_name: + self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key)) UpdateToken = TypeVar("UpdateToken")