diff --git a/raiden/network/transport/matrix/transport.py b/raiden/network/transport/matrix/transport.py index 6520a299725..3751774743e 100644 --- a/raiden/network/transport/matrix/transport.py +++ b/raiden/network/transport/matrix/transport.py @@ -440,27 +440,7 @@ def start( # type: ignore self._initialize_room_inventory() self._initialize_broadcast_rooms() self._initialize_whitelist(whitelist) - - broadcast_filter_id = self._client.create_sync_filter( - not_rooms=self._broadcast_rooms.values() - ) - self._client.set_sync_filter_id(broadcast_filter_id) - - def on_success(greenlet: gevent.Greenlet) -> None: - if greenlet in self.greenlets: - self.greenlets.remove(greenlet) - - self._client.start_listener_thread( - timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency - ) - assert isinstance(self._client.sync_worker, gevent.Greenlet) - self._client.sync_worker.link_exception(self.on_error) - self._client.sync_worker.link_value(on_success) - - assert isinstance(self._client.message_worker, gevent.Greenlet) - self._client.message_worker.link_exception(self.on_error) - self._client.message_worker.link_value(on_success) - self.greenlets = [self._client.sync_worker, self._client.message_worker] + self._initialize_sync() # (re)start any _RetryQueue which was initialized before start for retrier in self._address_to_retrier.values(): @@ -851,15 +831,58 @@ def _initialize_broadcast_rooms(self) -> None: msg = "To join the broadcast rooms the Matrix client to be properly authenticated." assert self._user_id, msg + pool = Pool(size=10) + + def _join_broadcast_room(client: GMatrixClient, broadcast_room_alias: str) -> None: + self.log.debug("Joining broadcast room", broadcast_room_alias=broadcast_room_alias) + self._broadcast_rooms[room_name] = join_broadcast_room( + client=client, broadcast_room_alias=broadcast_room_alias + ) + for suffix in self._config.broadcast_rooms: room_name = make_room_alias(self.chain_id, suffix) broadcast_room_alias = f"#{room_name}:{self._server_name}" if room_name not in self._broadcast_rooms: - self.log.debug("Joining broadcast room", broadcast_room_alias=broadcast_room_alias) - self._broadcast_rooms[room_name] = join_broadcast_room( - client=self._client, broadcast_room_alias=broadcast_room_alias - ) + pool.apply_async(_join_broadcast_room, args=(self._client, broadcast_room_alias)) + + pool.join(raise_error=True) + + def _initialize_sync(self) -> None: + msg = "_initialize_sync requires the GMatrixClient to be properly authenticated." + assert self._user_id, msg + + msg = "The sync thread must not be started twice" + assert self._client.sync_worker is None, msg + assert self._client.message_worker is None, msg + + msg = ( + "The node must have joined the broadcast rooms before starting the " + "sync thread, since that is necessary to properly generate the " + "filters." + ) + assert self._broadcast_rooms, msg + + broadcast_filter_id = self._client.create_sync_filter( + not_rooms=self._broadcast_rooms.values() + ) + self._client.set_sync_filter_id(broadcast_filter_id) + + def on_success(greenlet: gevent.Greenlet) -> None: + if greenlet in self.greenlets: + self.greenlets.remove(greenlet) + + self._client.start_listener_thread( + timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency + ) + assert isinstance(self._client.sync_worker, gevent.Greenlet) + self._client.sync_worker.link_exception(self.on_error) + self._client.sync_worker.link_value(on_success) + + assert isinstance(self._client.message_worker, gevent.Greenlet) + self._client.message_worker.link_exception(self.on_error) + self._client.message_worker.link_value(on_success) + self.greenlets = [self._client.sync_worker, self._client.message_worker] def _initialize_whitelist(self, whitelist: List[Address]) -> None: msg = (