diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 1b3beb1cae6..da4668f4df8 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -417,13 +417,15 @@ func (ei *EventIndex) RunMigration(from, to int) error { return nil } + log.Warnw("[migration v1=>v2] beginning transaction") + tx, err := ei.db.Begin() if err != nil { - return err + return xerrors.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() //nolint - if _, err = tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil { + if _, err := tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil { return xerrors.Errorf("failed to alter table event_entry to add codec column: %w", err) } @@ -434,7 +436,7 @@ func (ei *EventIndex) RunMigration(from, to int) error { return xerrors.Errorf("failed to update table event_entry to set codec 0x55: %w", err) } n, _ := res.RowsAffected() - log.Warnw("[migration v1=>v2] updated %d event entries with codec=0x55", "affected", n) + log.Warnw("[migration v1=>v2] updated event entries with codec=0x55", "affected", n) var keyRewrites = [][]string{ {"topic1", "t1"}, @@ -454,7 +456,38 @@ func (ei *EventIndex) RunMigration(from, to int) error { log.Warnw("[migration v1=>v2] rewrote entry keys", "from", from, "to", to, "affected", n) } - update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE value = ?`) + if _, err := tx.Exec(`DROP TABLE IF EXISTS event_entry_tmp`); err != nil { + return xerrors.Errorf("failed to drop old event_entry_tmp temporary table: %w", err) + } + + if _, err := tx.Exec(`CREATE TABLE event_entry_tmp ( + event_id INTEGER, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + codec INTEGER, + value BLOB NOT NULL + )`); err != nil { + return xerrors.Errorf("failed to create temporary table: %w", err) + } + + // Copy over all data. We will run the iterator from this table. + if _, err := tx.Exec(`INSERT INTO event_entry_tmp SELECT * FROM event_entry`); err != nil { + return xerrors.Errorf("failed to copy over data: %w", err) + } + + // Add an index on value. + if _, err := tx.Exec(`CREATE INDEX event_entry_value ON event_entry(value)`); err != nil { + return xerrors.Errorf("failed to create index on event_entry: %w", err) + } + + // Add an index on event_id. + if _, err := tx.Exec(`CREATE INDEX event_entry_event_id ON event_entry(event_id)`); err != nil { + return xerrors.Errorf("failed to create index on event_entry: %w", err) + } + + // Match by value to be extra safe. + update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE event_id = ? and key = ?`) if err != nil { return xerrors.Errorf("failed to prepare update statement: %w", err) } @@ -468,26 +501,24 @@ func (ei *EventIndex) RunMigration(from, to int) error { "t4": {}, } - if _, err = tx.Exec(`CREATE TABLE tmp AS SELECT key, value FROM event_entry`); err != nil { - return xerrors.Errorf("failed to create temporary table: %w", err) - } - - // Pull the values out and process them in Go land before updating. - rows, err := tx.Query(`SELECT key, value FROM tmp`) + // Pull the values out out of the original table, process them in Go land before updating, and the update the v2 table. + rows, err := tx.Query(`SELECT event_id, key, value FROM event_entry_tmp`) if err != nil { return xerrors.Errorf("failed to select values: %w", err) } var i int for rows.Next() { + var eventId int var key string var before []byte - if err := rows.Scan(&key, &before); err != nil { + if err := rows.Scan(&eventId, &key, &before); err != nil { + _ = rows.Close() return xerrors.Errorf("failed to scan from query: %w", err) } reader := bytes.NewReader(before) after, err := cbg.ReadByteArray(reader, 4<<20) - if err != nil || reader.Len() > 0 { + if err != nil { // Before Jan 23 we decoded events before putting them into the database. So // we're just going to assume that that's what's happening here, log an // error, and continue. @@ -499,22 +530,29 @@ func (ei *EventIndex) RunMigration(from, to int) error { copy(pvalue[32-len(after):], after) after = pvalue } - if _, err := update.Exec(after, before); err != nil { - return xerrors.Errorf("failed to scan from query: %w", err) + if _, err := update.Exec(after, eventId, key); err != nil { + log.Warnw("failed to update entry", "value_before", before, "value_after", after) } if i++; i%500 == 0 { log.Warnw("[migration v1=>v2] processed values", "count", i) } } + _ = rows.Close() log.Warnw("[migration v1=>v2] processed all values", "count", i) - if _, err = tx.Exec(`DROP TABLE tmp`); err != nil { - return xerrors.Errorf("failed to drop temporary table: %w", err) + if _, err := tx.Exec(`DROP INDEX event_entry_value`); err != nil { + log.Warnw("[migration v1=>v2] failed to drop index event_entry_value; ignoring") } - log.Warnw("[migration v1=>v2] dropped temporary table") + if _, err := tx.Exec(`DROP INDEX event_entry_event_id`); err != nil { + log.Warnw("[migration v1=>v2] failed to drop index event_entry_event_id; ignoring") + } + + if _, err = tx.Exec(`DROP TABLE event_entry_tmp`); err != nil { + return xerrors.Errorf("failed to drop original table: %w", err) + } if _, err = tx.Exec(`INSERT INTO _meta (version) VALUES (2)`); err != nil { return xerrors.Errorf("failed to update schema version: %w", err)