Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Correctly read to-device stream pos on SQLite #16682

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16682.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly read the to-device stream ID on startup using SQLite.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(
hs.get_instance_name(),
store.get_all_new_device_messages,
store._device_inbox_id_gen,
store._to_device_msg_id_gen,
)


Expand Down
31 changes: 19 additions & 12 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,32 @@ def __init__(
self._instance_name in hs.config.worker.writers.to_device
)

self._device_inbox_id_gen: AbstractStreamIdGenerator = (
self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[("device_inbox", "instance_name", "stream_id")],
tables=[
("device_inbox", "instance_name", "stream_id"),
("device_federation_outbox", "instance_name", "stream_id"),
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we see a problem on postgres then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because postgres uses a sequence in the DB to generate the IDs. This list of tables is only for enforcing consistency with that sequence at startup time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah!

sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)
)
else:
self._can_write_to_device = True
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
self._to_device_msg_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_inbox",
"stream_id",
extra_tables=[("device_federation_outbox", "stream_id")],
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
Expand Down Expand Up @@ -145,8 +152,8 @@ def process_replication_rows(
) -> None:
if stream_name == ToDeviceStream.NAME:
# If replication is happening than postgres must be being used.
assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
self._device_inbox_id_gen.advance(instance_name, token)
assert isinstance(self._to_device_msg_id_gen, MultiWriterIdGenerator)
self._to_device_msg_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
Expand All @@ -162,11 +169,11 @@ def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
self._to_device_msg_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)

def get_to_device_stream_token(self) -> int:
return self._device_inbox_id_gen.get_current_token()
return self._to_device_msg_id_gen.get_current_token()

async def get_messages_for_user_devices(
self,
Expand Down Expand Up @@ -801,7 +808,7 @@ def add_messages_txn(
msg.get(EventContentFields.TO_DEVICE_MSGID),
)

async with self._device_inbox_id_gen.get_next() as stream_id:
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
Expand All @@ -813,7 +820,7 @@ def add_messages_txn(
destination, stream_id
)

return self._device_inbox_id_gen.get_current_token()
return self._to_device_msg_id_gen.get_current_token()

async def add_messages_from_remote_to_device_inbox(
self,
Expand Down Expand Up @@ -857,7 +864,7 @@ def add_messages_txn(
txn, stream_id, local_messages_by_user_then_device
)

async with self._device_inbox_id_gen.get_next() as stream_id:
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
Expand Down
Loading