Skip to content

Commit

Permalink
hyperspace: event index v1=>v2 speed up migration.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Feb 15, 2023
1 parent 017f328 commit 0127775
Showing 1 changed file with 54 additions and 16 deletions.
70 changes: 54 additions & 16 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,15 @@ func (ei *EventIndex) RunMigration(from, to int) error {
return nil
}

log.Warnw("[migration v1=>v2] beginning transaction")

tx, err := ei.db.Begin()
if err != nil {
return err
return xerrors.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() //nolint

if _, err = tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil {
if _, err := tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil {
return xerrors.Errorf("failed to alter table event_entry to add codec column: %w", err)
}

Expand All @@ -434,7 +436,7 @@ func (ei *EventIndex) RunMigration(from, to int) error {
return xerrors.Errorf("failed to update table event_entry to set codec 0x55: %w", err)
}
n, _ := res.RowsAffected()
log.Warnw("[migration v1=>v2] updated %d event entries with codec=0x55", "affected", n)
log.Warnw("[migration v1=>v2] updated event entries with codec=0x55", "affected", n)

var keyRewrites = [][]string{
{"topic1", "t1"},
Expand All @@ -454,7 +456,38 @@ func (ei *EventIndex) RunMigration(from, to int) error {
log.Warnw("[migration v1=>v2] rewrote entry keys", "from", from, "to", to, "affected", n)
}

update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE value = ?`)
if _, err := tx.Exec(`DROP TABLE IF EXISTS event_entry_tmp`); err != nil {
return xerrors.Errorf("failed to drop old event_entry_tmp temporary table: %w", err)
}

if _, err := tx.Exec(`CREATE TABLE event_entry_tmp (
event_id INTEGER,
indexed INTEGER NOT NULL,
flags BLOB NOT NULL,
key TEXT NOT NULL,
codec INTEGER,
value BLOB NOT NULL
)`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

// Copy over all data. We will run the iterator from this table.
if _, err := tx.Exec(`INSERT INTO event_entry_tmp SELECT * FROM event_entry`); err != nil {
return xerrors.Errorf("failed to copy over data: %w", err)
}

// Add an index on value.
if _, err := tx.Exec(`CREATE INDEX event_entry_value ON event_entry(value)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

// Add an index on event_id.
if _, err := tx.Exec(`CREATE INDEX event_entry_event_id ON event_entry(event_id)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

// Match by value to be extra safe.
update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE event_id = ? and key = ?`)
if err != nil {
return xerrors.Errorf("failed to prepare update statement: %w", err)
}
Expand All @@ -468,21 +501,19 @@ func (ei *EventIndex) RunMigration(from, to int) error {
"t4": {},
}

if _, err = tx.Exec(`CREATE TABLE tmp AS SELECT key, value FROM event_entry`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

// Pull the values out and process them in Go land before updating.
rows, err := tx.Query(`SELECT key, value FROM tmp`)
// Pull the values out out of the original table, process them in Go land before updating, and the update the v2 table.
rows, err := tx.Query(`SELECT event_id, key, value FROM event_entry_tmp`)
if err != nil {
return xerrors.Errorf("failed to select values: %w", err)
}

var i int
for rows.Next() {
var eventId int
var key string
var before []byte
if err := rows.Scan(&key, &before); err != nil {
if err := rows.Scan(&eventId, &key, &before); err != nil {
_ = rows.Close()
return xerrors.Errorf("failed to scan from query: %w", err)
}
reader := bytes.NewReader(before)
Expand All @@ -499,22 +530,29 @@ func (ei *EventIndex) RunMigration(from, to int) error {
copy(pvalue[32-len(after):], after)
after = pvalue
}
if _, err := update.Exec(after, before); err != nil {
return xerrors.Errorf("failed to scan from query: %w", err)
if _, err := update.Exec(after, eventId, key); err != nil {
log.Warnw("failed to update entry", "value_before", before, "value_after", after)
}

if i++; i%500 == 0 {
log.Warnw("[migration v1=>v2] processed values", "count", i)
}
}
_ = rows.Close()

log.Warnw("[migration v1=>v2] processed all values", "count", i)

if _, err = tx.Exec(`DROP TABLE tmp`); err != nil {
return xerrors.Errorf("failed to drop temporary table: %w", err)
if _, err := tx.Exec(`DROP INDEX event_entry_value`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_value; ignoring")
}

log.Warnw("[migration v1=>v2] dropped temporary table")
if _, err := tx.Exec(`DROP INDEX event_entry_event_id`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_event_id; ignoring")
}

if _, err = tx.Exec(`DROP TABLE event_entry_tmp`); err != nil {
return xerrors.Errorf("failed to drop original table: %w", err)
}

if _, err = tx.Exec(`INSERT INTO _meta (version) VALUES (2)`); err != nil {
return xerrors.Errorf("failed to update schema version: %w", err)
Expand Down

0 comments on commit 0127775

Please sign in to comment.