Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a consistency check on events read from the database (#12620)
Browse files Browse the repository at this point in the history
I've seen a few errors which can only plausibly be explained by the calculated
event id for an event being different from the ID of the event in the
database. It should be cheap to check this, so let's do so and raise an
exception.
  • Loading branch information
richvdh authored May 3, 2022
1 parent 9ce51a4 commit 96e0cdb
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/12620.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a consistency check on events which we read from the database.
12 changes: 12 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,18 @@ async def _get_events_from_db(
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.outlier = row.outlier

# Consistency check: if the content of the event has been modified in the
# database, then the calculated event ID will not match the event id in the
# database.
if original_ev.event_id != event_id:
# it's difficult to see what to do here. Pretty much all bets are off
# if Synapse cannot rely on the consistency of its database.
raise RuntimeError(
f"Database corruption: Event {event_id} in room {d['room_id']} "
f"from the database appears to have been modified (calculated "
f"event id {original_ev.event_id})"
)

event_map[event_id] = original_ev

# finally, we can decide whether each one needs redacting, and build
Expand Down
59 changes: 37 additions & 22 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
# limitations under the License.
import json
from contextlib import contextmanager
from typing import Generator, Tuple
from typing import Generator, List, Tuple
from unittest import mock

from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.room_versions import EventFormatVersions, RoomVersions
from synapse.events import make_event_from_dict
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client import login, room
Expand Down Expand Up @@ -49,23 +50,28 @@ def prepare(self, reactor, clock, hs):
)
)

for idx, (rid, eid) in enumerate(
self.event_ids: List[str] = []
for idx, rid in enumerate(
(
("room1", "event10"),
("room1", "event11"),
("room1", "event12"),
("room2", "event20"),
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
Expand All @@ -75,41 +81,44 @@ def prepare(self, reactor, clock, hs):
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"json": json.dumps({"type": "test", "room_id": rid}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
)
)
self.event_ids.append(event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})

# that should result in a single db query
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event("event10"))
self.get_success(self.store.get_event(self.event_ids[0]))

# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
self.assertEqual(res, {"event10"})
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)


Expand Down Expand Up @@ -167,7 +176,6 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.store: EventsWorkerStore = hs.get_datastores().main

self.room_id = f"!room:{hs.hostname}"
self.event_ids = [f"event{i}" for i in range(20)]

self._populate_events()

Expand All @@ -190,8 +198,14 @@ def _populate_events(self) -> None:
)
)

self.event_ids = [f"event{i}" for i in range(20)]
for idx, event_id in enumerate(self.event_ids):
self.event_ids: List[str] = []
for idx in range(20):
event_json = {
"type": f"test {idx}",
"room_id": self.room_id,
}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
self.get_success(
self.store.db_pool.simple_upsert(
"events",
Expand All @@ -201,7 +215,7 @@ def _populate_events(self) -> None:
"room_id": self.room_id,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
Expand All @@ -213,12 +227,13 @@ def _populate_events(self) -> None:
{"event_id": event_id},
{
"room_id": self.room_id,
"json": json.dumps({"type": "test", "room_id": self.room_id}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": EventFormatVersions.V3,
},
)
)
self.event_ids.append(event_id)

@contextmanager
def _outage(self) -> Generator[None, None, None]:
Expand Down

0 comments on commit 96e0cdb

Please sign in to comment.