Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hyperspace: event index v1=>v2 speed up migration. #10276

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,15 @@ func (ei *EventIndex) RunMigration(from, to int) error {
return nil
}

log.Warnw("[migration v1=>v2] beginning transaction")

tx, err := ei.db.Begin()
if err != nil {
return err
return xerrors.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() //nolint

if _, err = tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil {
if _, err := tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil {
return xerrors.Errorf("failed to alter table event_entry to add codec column: %w", err)
}

Expand All @@ -434,7 +436,7 @@ func (ei *EventIndex) RunMigration(from, to int) error {
return xerrors.Errorf("failed to update table event_entry to set codec 0x55: %w", err)
}
n, _ := res.RowsAffected()
log.Warnw("[migration v1=>v2] updated %d event entries with codec=0x55", "affected", n)
log.Warnw("[migration v1=>v2] updated event entries with codec=0x55", "affected", n)

var keyRewrites = [][]string{
{"topic1", "t1"},
Expand All @@ -454,7 +456,38 @@ func (ei *EventIndex) RunMigration(from, to int) error {
log.Warnw("[migration v1=>v2] rewrote entry keys", "from", from, "to", to, "affected", n)
}

update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE value = ?`)
if _, err := tx.Exec(`DROP TABLE IF EXISTS event_entry_tmp`); err != nil {
return xerrors.Errorf("failed to drop old event_entry_tmp temporary table: %w", err)
}

if _, err := tx.Exec(`CREATE TABLE event_entry_tmp (
event_id INTEGER,
indexed INTEGER NOT NULL,
flags BLOB NOT NULL,
key TEXT NOT NULL,
codec INTEGER,
value BLOB NOT NULL
)`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

// Copy over all data. We will run the iterator from this table.
if _, err := tx.Exec(`INSERT INTO event_entry_tmp SELECT * FROM event_entry`); err != nil {
return xerrors.Errorf("failed to copy over data: %w", err)
}

// Add an index on value.
if _, err := tx.Exec(`CREATE INDEX event_entry_value ON event_entry(value)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

// Add an index on event_id.
if _, err := tx.Exec(`CREATE INDEX event_entry_event_id ON event_entry(event_id)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

// Match by value to be extra safe.
update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE event_id = ? and key = ?`)
if err != nil {
return xerrors.Errorf("failed to prepare update statement: %w", err)
}
Expand All @@ -468,21 +501,19 @@ func (ei *EventIndex) RunMigration(from, to int) error {
"t4": {},
}

if _, err = tx.Exec(`CREATE TABLE tmp AS SELECT key, value FROM event_entry`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

// Pull the values out and process them in Go land before updating.
rows, err := tx.Query(`SELECT key, value FROM tmp`)
// Pull the values out out of the original table, process them in Go land before updating, and the update the v2 table.
rows, err := tx.Query(`SELECT event_id, key, value FROM event_entry_tmp`)
if err != nil {
return xerrors.Errorf("failed to select values: %w", err)
}

var i int
for rows.Next() {
var eventId int
var key string
var before []byte
if err := rows.Scan(&key, &before); err != nil {
if err := rows.Scan(&eventId, &key, &before); err != nil {
_ = rows.Close()
return xerrors.Errorf("failed to scan from query: %w", err)
}
reader := bytes.NewReader(before)
Expand All @@ -499,22 +530,29 @@ func (ei *EventIndex) RunMigration(from, to int) error {
copy(pvalue[32-len(after):], after)
after = pvalue
}
if _, err := update.Exec(after, before); err != nil {
return xerrors.Errorf("failed to scan from query: %w", err)
if _, err := update.Exec(after, eventId, key); err != nil {
log.Warnw("failed to update entry", "value_before", before, "value_after", after)
}

if i++; i%500 == 0 {
log.Warnw("[migration v1=>v2] processed values", "count", i)
}
}
_ = rows.Close()

log.Warnw("[migration v1=>v2] processed all values", "count", i)

if _, err = tx.Exec(`DROP TABLE tmp`); err != nil {
return xerrors.Errorf("failed to drop temporary table: %w", err)
if _, err := tx.Exec(`DROP INDEX event_entry_value`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_value; ignoring")
}

log.Warnw("[migration v1=>v2] dropped temporary table")
if _, err := tx.Exec(`DROP INDEX event_entry_event_id`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_event_id; ignoring")
}

if _, err = tx.Exec(`DROP TABLE event_entry_tmp`); err != nil {
return xerrors.Errorf("failed to drop original table: %w", err)
}

if _, err = tx.Exec(`INSERT INTO _meta (version) VALUES (2)`); err != nil {
return xerrors.Errorf("failed to update schema version: %w", err)
Expand Down