Skip to content

Commit

Permalink
parallelize broadcast room join
Browse files Browse the repository at this point in the history
  • Loading branch information
hackaugusto committed Feb 11, 2020
1 parent d8946cf commit 6fa53f1
Showing 1 changed file with 48 additions and 25 deletions.
73 changes: 48 additions & 25 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,27 +440,7 @@ def start( # type: ignore
self._initialize_room_inventory()
self._initialize_broadcast_rooms()
self._initialize_whitelist(whitelist)

broadcast_filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values()
)
self._client.set_sync_filter_id(broadcast_filter_id)

def on_success(greenlet: gevent.Greenlet) -> None:
if greenlet in self.greenlets:
self.greenlets.remove(greenlet)

self._client.start_listener_thread(
timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency
)
assert isinstance(self._client.sync_worker, gevent.Greenlet)
self._client.sync_worker.link_exception(self.on_error)
self._client.sync_worker.link_value(on_success)

assert isinstance(self._client.message_worker, gevent.Greenlet)
self._client.message_worker.link_exception(self.on_error)
self._client.message_worker.link_value(on_success)
self.greenlets = [self._client.sync_worker, self._client.message_worker]
self._initialize_sync()

# (re)start any _RetryQueue which was initialized before start
for retrier in self._address_to_retrier.values():
Expand Down Expand Up @@ -851,15 +831,58 @@ def _initialize_broadcast_rooms(self) -> None:
msg = "To join the broadcast rooms the Matrix client to be properly authenticated."
assert self._user_id, msg

pool = Pool(size=10)

def _join_broadcast_room(client: GMatrixClient, broadcast_room_alias: str) -> None:
self.log.debug("Joining broadcast room", broadcast_room_alias=broadcast_room_alias)
self._broadcast_rooms[room_name] = join_broadcast_room(
client=client, broadcast_room_alias=broadcast_room_alias
)

for suffix in self._config.broadcast_rooms:
room_name = make_room_alias(self.chain_id, suffix)
broadcast_room_alias = f"#{room_name}:{self._server_name}"

if room_name not in self._broadcast_rooms:
self.log.debug("Joining broadcast room", broadcast_room_alias=broadcast_room_alias)
self._broadcast_rooms[room_name] = join_broadcast_room(
client=self._client, broadcast_room_alias=broadcast_room_alias
)
pool.apply_async(_join_broadcast_room, args=(self._client, broadcast_room_alias))

pool.join(raise_error=True)

def _initialize_sync(self) -> None:
msg = "_initialize_sync requires the GMatrixClient to be properly authenticated."
assert self._user_id, msg

msg = "The sync thread must not be started twice"
assert self._client.sync_worker is None, msg
assert self._client.message_worker is None, msg

msg = (
"The node must have joined the broadcast rooms before starting the "
"sync thread, since that is necessary to properly generate the "
"filters."
)
assert self._broadcast_rooms, msg

broadcast_filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values()
)
self._client.set_sync_filter_id(broadcast_filter_id)

def on_success(greenlet: gevent.Greenlet) -> None:
if greenlet in self.greenlets:
self.greenlets.remove(greenlet)

self._client.start_listener_thread(
timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency
)
assert isinstance(self._client.sync_worker, gevent.Greenlet)
self._client.sync_worker.link_exception(self.on_error)
self._client.sync_worker.link_value(on_success)

assert isinstance(self._client.message_worker, gevent.Greenlet)
self._client.message_worker.link_exception(self.on_error)
self._client.message_worker.link_value(on_success)
self.greenlets = [self._client.sync_worker, self._client.message_worker]

def _initialize_whitelist(self, whitelist: List[Address]) -> None:
msg = (
Expand Down

0 comments on commit 6fa53f1

Please sign in to comment.