Skip to content

Commit

Permalink
hyperspace: event index v1=>v2 speed up migration.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Feb 15, 2023
1 parent 017f328 commit a0030a0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 24 deletions.
93 changes: 69 additions & 24 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,47 @@ func (ei *EventIndex) RunMigration(from, to int) error {
return nil
}

tx, err := ei.db.Begin()
if err != nil {
return err
if _, err := ei.db.Exec(`DROP TABLE IF EXISTS event_entry_v2`); err != nil {
return xerrors.Errorf("failed to drop old event_entry_v2 temporary table: %w", err)
}
defer tx.Rollback() //nolint

if _, err = tx.Exec(`ALTER TABLE event_entry ADD COLUMN codec INTEGER`); err != nil {
return xerrors.Errorf("failed to alter table event_entry to add codec column: %w", err)
// Create a temporary table that is a clone of the original table.
if _, err := ei.db.Exec(`CREATE TABLE IF NOT EXISTS event_entry_v2 (
event_id INTEGER,
indexed INTEGER NOT NULL,
flags BLOB NOT NULL,
key TEXT NOT NULL,
value BLOB NOT NULL
)`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

if _, err := ei.db.Exec(`CREATE INDEX event_entry_value ON event_entry_v2(value)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

if _, err := ei.db.Exec(`CREATE INDEX event_entry_event_id ON event_entry_v2(event_id)`); err != nil {
return xerrors.Errorf("failed to create index on event_entry: %w", err)
}

if _, err := ei.db.Exec(`INSERT INTO event_entry_v2(event_id, indexed, flags, key, value) SELECT * FROM event_entry`); err != nil {
return xerrors.Errorf("failed to copy all rows to temporary table: %w", err)
}

// Operate on the temporary table.

if _, err := ei.db.Exec(`ALTER TABLE event_entry_v2 ADD COLUMN codec INTEGER`); err != nil {
return xerrors.Errorf("failed to alter table event_entry_v2 to add codec column: %w", err)
}

log.Warnw("[migration v1=>v2] event_entry table schema updated")
log.Warnw("[migration v1=>v2] event_entry_v2 table schema updated")

res, err := tx.Exec(`UPDATE event_entry SET codec = ? WHERE codec IS NULL`, 0x55)
res, err := ei.db.Exec(`UPDATE event_entry_v2 SET codec = ? WHERE codec IS NULL`, 0x55)
if err != nil {
return xerrors.Errorf("failed to update table event_entry to set codec 0x55: %w", err)
return xerrors.Errorf("failed to update table event_entry_v2 to set codec 0x55: %w", err)
}
n, _ := res.RowsAffected()
log.Warnw("[migration v1=>v2] updated %d event entries with codec=0x55", "affected", n)
log.Warnw("[migration v1=>v2] updated event entries with codec=0x55", "affected", n)

var keyRewrites = [][]string{
{"topic1", "t1"},
Expand All @@ -446,15 +469,15 @@ func (ei *EventIndex) RunMigration(from, to int) error {

for _, r := range keyRewrites {
from, to := r[0], r[1]
res, err := tx.Exec(`UPDATE event_entry SET key = ? WHERE key = ?`, to, from)
res, err := ei.db.Exec(`UPDATE event_entry_v2 SET key = ? WHERE key = ?`, to, from)
if err != nil {
return xerrors.Errorf("failed to update entries from key %s to key %s: %w", from, to, err)
}
n, _ := res.RowsAffected()
log.Warnw("[migration v1=>v2] rewrote entry keys", "from", from, "to", to, "affected", n)
}

update, err := tx.Prepare(`UPDATE event_entry SET value = ? WHERE value = ?`)
update, err := ei.db.Prepare(`UPDATE event_entry_v2 SET value = ? WHERE event_id = ? and value = ?`)
if err != nil {
return xerrors.Errorf("failed to prepare update statement: %w", err)
}
Expand All @@ -468,21 +491,20 @@ func (ei *EventIndex) RunMigration(from, to int) error {
"t4": {},
}

if _, err = tx.Exec(`CREATE TABLE tmp AS SELECT key, value FROM event_entry`); err != nil {
return xerrors.Errorf("failed to create temporary table: %w", err)
}

// Pull the values out and process them in Go land before updating.
rows, err := tx.Query(`SELECT key, value FROM tmp`)
// Pull the values out out of the original table, process them in Go land before updating, and the update the v2 table.
rows, err := ei.db.Query(`SELECT event_id, key, value FROM event_entry`)
if err != nil {
_ = rows.Close()
return xerrors.Errorf("failed to select values: %w", err)
}

var i int
for rows.Next() {
var eventId int
var key string
var before []byte
if err := rows.Scan(&key, &before); err != nil {
if err := rows.Scan(&eventId, &key, &before); err != nil {
_ = rows.Close()
return xerrors.Errorf("failed to scan from query: %w", err)
}
reader := bytes.NewReader(before)
Expand All @@ -499,22 +521,45 @@ func (ei *EventIndex) RunMigration(from, to int) error {
copy(pvalue[32-len(after):], after)
after = pvalue
}
if _, err := update.Exec(after, before); err != nil {
return xerrors.Errorf("failed to scan from query: %w", err)
if _, err := update.Exec(after, eventId, before); err != nil {
log.Warnw("failed to update entry", "value_before", before, "value_after", after)
}

if i++; i%500 == 0 {
log.Warnw("[migration v1=>v2] processed values", "count", i)
}
}
_ = rows.Close()

log.Warnw("[migration v1=>v2] processed all values", "count", i)

if _, err = tx.Exec(`DROP TABLE tmp`); err != nil {
return xerrors.Errorf("failed to drop temporary table: %w", err)
if _, err := ei.db.Exec(`DROP INDEX event_entry_event_id`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_event_id; ignoring")
}

if _, err := ei.db.Exec(`DROP INDEX event_entry_value`); err != nil {
log.Warnw("[migration v1=>v2] failed to drop index event_entry_value; ignoring")
}

// Critical section.

log.Warnw("[migration v1=>v2] beginning transaction")

tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() //nolint

if _, err = tx.Exec(`DROP TABLE event_entry`); err != nil {
return xerrors.Errorf("failed to drop original table: %w", err)
}

if _, err = tx.Exec(`ALTER TABLE event_entry_v2 RENAME TO event_entry`); err != nil {
return xerrors.Errorf("failed to rename table: %w", err)
}

log.Warnw("[migration v1=>v2] dropped temporary table")
log.Warnw("[migration v1=>v2] replaced original table")

if _, err = tx.Exec(`INSERT INTO _meta (version) VALUES (2)`); err != nil {
return xerrors.Errorf("failed to update schema version: %w", err)
Expand Down
13 changes: 13 additions & 0 deletions chain/events/filter/index_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package filter

import (
"fmt"
"testing"

logging "github.com/ipfs/go-log/v2"
)

func TestMigration(t *testing.T) {
_ = logging.SetLogLevel("event_index", "INFO")
fmt.Println(NewEventIndex("/Users/raul/hyperspace-test/sqlite/events.db"))
}

0 comments on commit a0030a0

Please sign in to comment.