Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix background reindex of origin_server_ts #1145

Merged
merged 1 commit into from
Sep 29, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1355,39 +1355,53 @@ def reindex_search_txn(txn):
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]

events = self._get_events_txn(txn, event_ids)
rows_to_update = []

rows = []
for event in events:
try:
event_id = event.event_id
origin_server_ts = event.origin_server_ts
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
chunks = [
event_ids[i:i + 100]
for i in xrange(0, len(event_ids), 100)
]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
txn,
table="event_json",
column="event_id",
iterable=chunk,
retcols=["event_id", "json"],
keyvalues={},
)

rows.append((origin_server_ts, event_id))
for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

rows_to_update.append((origin_server_ts, event_id))

sql = (
"UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
)

for index in range(0, len(rows), INSERT_CLUMP_SIZE):
clump = rows[index:index + INSERT_CLUMP_SIZE]
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows)
"rows_inserted": rows_inserted + len(rows_to_update)
}

self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)

return len(rows)
return len(rows_to_update)

result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
Expand Down