Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize broadcast room join #5717

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions raiden/network/transport/matrix/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,22 @@ def update_local_aliases(self) -> bool:
"""
server_name = urlparse(self.client.api.base_url).netloc
changed = False
for event_type in ["m.room.aliases", "m.room.canonical_alias"]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we need the canonical aliases anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use them anymore

try:
response = self.client.api.get_room_state_type(
self.room_id, event_type, server_name
)
except MatrixRequestError:
continue
if "aliases" in response:
if self.aliases != response["aliases"]:
self.aliases = response["aliases"]
changed = True
if "alias" in response:
if self.canonical_alias != response["alias"]:
self.canonical_alias = response["alias"]
changed = True

try:
response = self.client.api.get_room_state_type(
self.room_id, "m.room.aliases", server_name
)
except MatrixRequestError:
return False

if "aliases" in response:
if self.aliases != response["aliases"]:
self.aliases = response["aliases"]
changed = True
if "alias" in response:
if self.canonical_alias != response["alias"]:
self.canonical_alias = response["alias"]
changed = True
if changed and self.aliases and not self.canonical_alias:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a room to not have a canonical alias?

self.canonical_alias = self.aliases[0]
return changed
Expand Down
88 changes: 59 additions & 29 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,27 +462,7 @@ def start( # type: ignore
self._initialize_broadcast_rooms()
self._initialize_first_sync()
self._initialize_health_check(health_check_list)

broadcast_filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values()
)
self._client.set_sync_filter_id(broadcast_filter_id)

def on_success(greenlet: gevent.Greenlet) -> None:
if greenlet in self.greenlets:
self.greenlets.remove(greenlet)

self._client.start_listener_thread(
timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency
)
assert isinstance(self._client.sync_worker, gevent.Greenlet), MYPY_ANNOTATION
self._client.sync_worker.link_exception(self.on_error)
self._client.sync_worker.link_value(on_success)

assert isinstance(self._client.message_worker, gevent.Greenlet), MYPY_ANNOTATION
self._client.message_worker.link_exception(self.on_error)
self._client.message_worker.link_value(on_success)
self.greenlets = [self._client.sync_worker, self._client.message_worker]
self._initialize_sync()

# (re)start any _RetryQueue which was initialized before start
for retrier in self._address_to_retrier.values():
Expand Down Expand Up @@ -833,15 +813,61 @@ def _initialize_broadcast_rooms(self) -> None:
msg = "To join the broadcast rooms the Matrix client to be properly authenticated."
assert self._user_id, msg

for suffix in self._config.broadcast_rooms:
room_name = make_room_alias(self.chain_id, suffix)
broadcast_room_alias = f"#{room_name}:{self._server_name}"
pool = Pool(size=10)

self.log.debug("Joining broadcast room", broadcast_room_alias=broadcast_room_alias)
self._broadcast_rooms[room_name] = join_broadcast_room(
client=self._client, broadcast_room_alias=broadcast_room_alias
def _join_broadcast_room(transport: MatrixTransport, room_name: str) -> None:
broadcast_room_alias = f"#{room_name}:{transport._server_name}"
transport.log.debug(
"Joining broadcast room", broadcast_room_alias=broadcast_room_alias
)
transport._broadcast_rooms[room_name] = join_broadcast_room(
client=transport._client, broadcast_room_alias=broadcast_room_alias
)

for suffix in self._config.broadcast_rooms:
alias_prefix = make_room_alias(self.chain_id, suffix)

if alias_prefix not in self._broadcast_rooms:
pool.apply_async(_join_broadcast_room, args=(self, alias_prefix))

pool.join(raise_error=True)

def _initialize_sync(self) -> None:
msg = "_initialize_sync requires the GMatrixClient to be properly authenticated."
assert self._user_id, msg

msg = "The sync thread must not be started twice"
assert self._client.sync_worker is None, msg
assert self._client.message_worker is None, msg

msg = (
"The node must have joined the broadcast rooms before starting the "
"sync thread, since that is necessary to properly generate the "
"filters."
)
assert self._broadcast_rooms, msg
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right? do we join the broadcast rooms if the node is not configured to use the ms and pfs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will always join at least the discovery room.


broadcast_filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values()
)
self._client.set_sync_filter_id(broadcast_filter_id)

def on_success(greenlet: gevent.Greenlet) -> None:
if greenlet in self.greenlets:
self.greenlets.remove(greenlet)

self._client.start_listener_thread(
timeout_ms=self._config.sync_timeout, latency_ms=self._config.sync_latency
)
assert isinstance(self._client.sync_worker, gevent.Greenlet), MYPY_ANNOTATION
self._client.sync_worker.link_exception(self.on_error)
self._client.sync_worker.link_value(on_success)

assert isinstance(self._client.message_worker, gevent.Greenlet), MYPY_ANNOTATION
self._client.message_worker.link_exception(self.on_error)
self._client.message_worker.link_value(on_success)
self.greenlets = [self._client.sync_worker, self._client.message_worker]

def _initialize_health_check(self, health_check_list: List[Address]) -> None:
msg = (
"Healthcheck requires the Matrix client to be properly "
Expand Down Expand Up @@ -988,7 +1014,11 @@ def _handle_invite(self, room_id: RoomID, state: dict) -> None:

def _handle_member_join(self, room: Room) -> None:
if self._is_broadcast_room(room):
raise AssertionError("Broadcast room events should be filtered in syncs")
raise AssertionError(
f"Broadcast room events should be filtered in syncs: {room.aliases}."
f"Joined Broadcast Rooms: {list(self._broadcast_rooms.keys())}"
f"Should be joined to: {self._config.broadcast_rooms}"
)

if self._has_multiple_partner_addresses(room):
self._leave_unexpected_rooms(
Expand Down Expand Up @@ -1212,7 +1242,7 @@ def _maybe_create_room_for_address(self, address: Address) -> None:

assert self._raiden_service is not None, "_raiden_service not set"

# The rooms creation is asymmetric, only the node with the lower
# The rooms creation is asymetric, only the node with the lower
# address is responsible to create the room. This fixes race conditions
# were the two nodes try to create a room with each other at the same
# time, leading to communications problems if the nodes choose a
Expand Down