From 187ab28611546321e02770944c86f30ee2bc742a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jul 2021 19:56:44 -0500 Subject: [PATCH] Messy: Fix undefined state_group for federated historical events ``` 2021-07-13 02:27:57,810 - synapse.handlers.federation - 1248 - ERROR - GET-4 - Failed to backfill from hs1 because NOT NULL constraint failed: event_to_state_groups.state_group Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1216, in try_backfill await self.backfill( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1035, in backfill await self._auth_and_persist_event(dest, event, context, backfilled=True) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2222, in _auth_and_persist_event await self._run_push_actions_and_persist_event(event, context, backfilled) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2244, in _run_push_actions_and_persist_event await self.persist_events_and_notify( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 3290, in persist_events_and_notify events, max_stream_token = await self.storage.persistence.persist_events( File "/usr/local/lib/python3.8/site-packages/synapse/logging/opentracing.py", line 774, in _trace_inner return await func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 320, in persist_events ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 237, in handle_queue_loop ret = await self._per_item_callback( File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 577, in _persist_event_batch await self.persist_events_store._persist_events_and_state_updates( File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 176, in _persist_events_and_state_updates await self.db_pool.runInteraction( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 681, in runInteraction result = await self.runWithConnection( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 770, in runWithConnection return await make_deferred_yieldable( File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 238, in inContext result = inContext.theWork() # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 254, in inContext.theWork = lambda: context.call( # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 118, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 83, in callWithContext return func(*args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 293, in _runWithConnection compat.reraise(excValue, excTraceback) File "/usr/local/lib/python3.8/site-packages/twisted/python/deprecate.py", line 298, in deprecatedFunction return function(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/twisted/python/compat.py", line 403, in reraise raise exception.with_traceback(traceback) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 284, in _runWithConnection result = func(conn, *args, **kw) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 765, in inner_func return func(db_conn, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 549, in new_transaction r = func(cursor, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/logging/utils.py", line 69, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 385, in _persist_events_txn self._store_event_state_mappings_txn(txn, events_and_contexts) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 2065, in _store_event_state_mappings_txn self.db_pool.simple_insert_many_txn( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 923, in simple_insert_many_txn txn.execute_batch(sql, vals) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 280, in execute_batch self.executemany(sql, args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 300, in executemany self._do_execute(self.txn.executemany, sql, *args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 330, in _do_execute return func(sql, *args) sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group ``` --- synapse/handlers/federation.py | 68 +++++++++++++++++++++--- synapse/state/__init__.py | 16 ++++++ synapse/storage/databases/main/events.py | 9 ++++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d3823e24dea7..11b500f15179 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -924,7 +924,11 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase): origin, marker_event.room_id, [insertion_event_id], + # outlier=False, ) + # await self._get_state_after_missing_prev_event( + # origin, marker_event.room_id, insertion_event_id + # ) insertion_event = await self.store.get_event( insertion_event_id, allow_none=True @@ -1078,15 +1082,27 @@ async def backfill( # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) + logger.info("backfill: events=%s", events) for event in events: if event in events_to_state: continue # For paranoia we ensure that these events are marked as # non-outliers + logger.info( + "backfill: persist event_id=%s (%s) outlier=%s", + event.event_id, + event.type, + event.internal_metadata.is_outlier(), + ) assert not event.internal_metadata.is_outlier() context = await self.state_handler.compute_event_context(event) + logger.info( + "backfill: context event_id=%s state_group=%s", + event.event_id, + context.state_group, + ) # We store these one at a time since each event depends on the # previous to work out the state. @@ -1383,7 +1399,12 @@ async def try_backfill(domains: List[str]) -> bool: return False async def _get_events_and_persist( - self, destination: str, room_id: str, events: Iterable[str] + self, + destination: str, + room_id: str, + events: Iterable[str], + # TODO: check if still used + outlier: bool = True, ) -> None: """Fetch the given events from a server, and persist them as outliers. @@ -1405,7 +1426,7 @@ async def get_event(event_id: str): [destination], event_id, room_version, - outlier=True, + outlier=outlier, ) if event is None: logger.warning( @@ -2278,6 +2299,11 @@ async def _auth_and_persist_event( server. backfilled: True if the event was backfilled. """ + logger.info( + "_auth_and_persist_event: before event_id=%s state_group=%s", + event.event_id, + context.state_group, + ) context = await self._check_event_auth( origin, event, @@ -2286,6 +2312,11 @@ async def _auth_and_persist_event( auth_events=auth_events, backfilled=backfilled, ) + logger.info( + "_auth_and_persist_event: after event_id=%s state_group=%s", + event.event_id, + context.state_group, + ) await self._run_push_actions_and_persist_event(event, context, backfilled) @@ -2667,9 +2698,19 @@ async def _check_event_auth( auth_events[(c.type, c.state_key)] = c try: + logger.info( + "_check_event_auth: before event_id=%s state_group=%s", + event.event_id, + context.state_group, + ) context = await self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) + logger.info( + "_check_event_auth: after event_id=%s state_group=%s", + event.event_id, + context.state_group, + ) except Exception: # We don't really mind if the above fails, so lets not fail # processing if it does. However, it really shouldn't fail so @@ -2756,7 +2797,11 @@ async def _update_auth_events_and_context_for_auth( if missing_auth: # If we don't have all the auth events, we need to get them. - logger.info("auth_events contains unknown events: %s", missing_auth) + logger.info( + "auth_events contains unknown events for event_id=%s, missing_auth=%s", + event.event_id, + missing_auth, + ) try: try: remote_auth_chain = await self.federation_client.get_event_auth( @@ -2793,9 +2838,13 @@ async def _update_auth_events_and_context_for_auth( event.event_id, e.event_id, ) - context = await self.state_handler.compute_event_context(e) + # XXX: Main fix is here. It was computing context for the missing auth event + # and re-assigning to the `context` variable used for the main event + missing_auth_context = ( + await self.state_handler.compute_event_context(e) + ) await self._auth_and_persist_event( - origin, e, context, auth_events=auth + origin, e, missing_auth_context, auth_events=auth ) if e.event_id in event_auth_events: @@ -2806,13 +2855,20 @@ async def _update_auth_events_and_context_for_auth( except Exception: logger.exception("Failed to get auth chain") + logger.info( + "_update_auth_events_and_context_for_auth: check outlier event_id=%s outlier=%s", + event.event_id, + event.internal_metadata.is_outlier(), + ) if event.internal_metadata.is_outlier(): # XXX: given that, for an outlier, we'll be working with the # event's *claimed* auth events rather than those we calculated: # (a) is there any point in this test, since different_auth below will # obviously be empty # (b) alternatively, why don't we do it earlier? - logger.info("Skipping auth_event fetch for outlier") + logger.info( + "Skipping auth_event fetch for outlier event_id=%s", event.event_id + ) return context different_auth = event_auth_events.difference( diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index a1770f620e59..98565156a65b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -324,6 +324,13 @@ async def compute_event_context( entry = await self.resolve_state_groups_for_events( event.room_id, event.prev_event_ids() ) + logger.info( + "compute_event_context: resolve_state_groups_for_events\nstate_ids_before_event=%s\nstate_group_before_event=%s\nstate_group_before_event_prev_group=%s\ndeltas_to_state_group_before_event=%s", + entry.state, + entry.state_group, + entry.prev_group, + entry.delta_ids, + ) state_ids_before_event = entry.state state_group_before_event = entry.state_group @@ -359,6 +366,10 @@ async def compute_event_context( # if not event.is_state(): + logger.info( + "compute_event_context: returning with state_group_before_event=%s", + state_group_before_event, + ) return EventContext.with_state( state_group_before_event=state_group_before_event, state_group=state_group_before_event, @@ -390,6 +401,11 @@ async def compute_event_context( current_state_ids=state_ids_after_event, ) + logger.info( + "compute_event_context: after\nstate_group_after_event=%s", + state_group_after_event, + ) + return EventContext.with_state( state_group=state_group_after_event, state_group_before_event=state_group_before_event, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c3b6164c360a..84270e771772 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2032,6 +2032,13 @@ def _store_event_state_mappings_txn( ): state_groups = {} for event, context in events_and_contexts: + + logger.info( + "creating state_groups grsesegr event_id=%s outlier=%s %s", + event.event_id, + event.internal_metadata.is_outlier(), + event, + ) if event.internal_metadata.is_outlier(): continue @@ -2043,6 +2050,8 @@ def _store_event_state_mappings_txn( state_groups[event.event_id] = context.state_group + logger.info("state_groups asdfasdf %s", state_groups) + self.db_pool.simple_insert_many_txn( txn, table="event_to_state_groups",