Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: events index: record processed epochs and tipsets for events and eth_get_log blocks till requested tipset has been indexed #12080

Merged
merged 20 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 255 additions & 7 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -61,9 +62,13 @@ var ddls = []string{
value BLOB NOT NULL
)`,

createTableEventsSeen,

createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKeyCid,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
Expand All @@ -75,20 +80,24 @@ var ddls = []string{
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (6)`,
}

var (
log = logging.Logger("filter")
)

const (
schemaVersion = 5
schemaVersion = 6

eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`
restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`
insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?)`
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
Expand All @@ -98,6 +107,16 @@ const (
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL
)`

createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);`
rvagg marked this conversation as resolved.
Show resolved Hide resolved
)

type EventIndex struct {
Expand All @@ -108,6 +127,25 @@ type EventIndex struct {
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
stmtInsertEventsSeen *sql.Stmt
stmtRevertEventSeen *sql.Stmt
stmtRestoreEventSeen *sql.Stmt

mu sync.Mutex
subIdCounter uint64
updateSubs map[uint64]*updateSub
}

type updateSub struct {
ctx context.Context
ch chan *EventIndexUpdate
cancel context.CancelFunc
}

type EventIndexUpdate struct {
Height uint64
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
TipsetCid cid.Cid
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
Reverted bool
}

func (ei *EventIndex) initStatements() (err error) {
Expand Down Expand Up @@ -136,6 +174,21 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}

ei.stmtInsertEventsSeen, err = ei.db.Prepare(insertEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEventsSeen: %w", err)
}

ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err)
}

ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err)
}

return nil
}

Expand Down Expand Up @@ -401,9 +454,64 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
return xerrors.Errorf("commit transaction: %w", err)
}

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
return nil
}

func (ei *EventIndex) migrateToVersion6(ctx context.Context) error {
now := time.Now()

tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()

stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err)
}
_, err = stmtCreateTableEventsSeen.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("create table events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, createIndexEventsSeenHeight)
if err != nil {
return xerrors.Errorf("create index events_seen_height: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid)
if err != nil {
return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err)
}

// INSERT an entry in the events_seen table for all epochs we do have events for in our DB
_, err = tx.ExecContext(ctx, `
INSERT INTO events_seen (height, tipset_key_cid, reverted)
SELECT e.height, e.tipset_key_cid, e.reverted
FROM event e
WHERE NOT EXISTS (
SELECT 1 FROM events_seen es
WHERE es.height = e.height AND es.tipset_key_cid = e.tipset_key_cid
)
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
`)
if err != nil {
return xerrors.Errorf("insert events into events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)")
if err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.vacuumDBAndCheckpointWAL(ctx)
rvagg marked this conversation as resolved.
Show resolved Hide resolved

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now))
return nil
}

Expand Down Expand Up @@ -498,6 +606,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
version = 5
}

if version == 5 {
log.Infof("Upgrading event index from version 5 to version 6")
err = eventIndex.migrateToVersion6(ctx)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err)
}
version = 6
}

if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand All @@ -510,6 +628,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}

eventIndex.updateSubs = make(map[uint64]*updateSub)

return &eventIndex, nil
}

Expand All @@ -520,6 +640,61 @@ func (ei *EventIndex) Close() error {
return ei.db.Close()
}

func (ei *EventIndex) SubscribeUpdates() (uint64, chan *EventIndexUpdate) {
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
subCtx, subCancel := context.WithCancel(context.Background())
ch := make(chan *EventIndexUpdate)

tSub := &updateSub{
ctx: subCtx,
cancel: subCancel,
ch: ch,
}

ei.mu.Lock()
subId := ei.subIdCounter
ei.subIdCounter++
ei.updateSubs[subId] = tSub
ei.mu.Unlock()

return subId, tSub.ch
}

func (ei *EventIndex) UnsubscribeUpdates(subId uint64) {
ei.mu.Lock()
tSub, ok := ei.updateSubs[subId]
if !ok {
ei.mu.Unlock()
return
}
delete(ei.updateSubs, subId)
ei.mu.Unlock()

// cancel the subscription
tSub.cancel()
}

func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
row := ei.db.QueryRowContext(ctx, "SELECT MAX(height) FROM events_seen")
var maxHeight uint64
err := row.Scan(&maxHeight)
return maxHeight, err
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
mh, err := ei.GetMaxHeightInIndex(ctx)
if err != nil {
return false, err
}
return height <= mh, nil
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid = ?", tipsetKeyCid)
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.Begin()
if err != nil {
Expand All @@ -530,16 +705,45 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever

// lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return
if revert {
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
if err != nil {
return xerrors.Errorf("revert event: %w", err)
}

_, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes())
if err != nil {
return xerrors.Errorf("revert event seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := ei.updateSubs
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- &EventIndexUpdate{
Height: uint64(te.msgTs.Height()),
TipsetCid: tsKeyCid,
Reverted: true,
}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand All @@ -550,6 +754,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved

eventCount := 0
// iterate over all executed messages in this tipset and insert them into the database if they
Expand All @@ -567,11 +775,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
addressLookups[ev.Emitter] = addr
}

tsKeyCid, err := te.msgTs.Key().Cid()
rvagg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
Expand Down Expand Up @@ -646,16 +849,61 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
if rowsAffected != 1 {
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
}

res, err = tx.Stmt(ei.stmtRestoreEventSeen).Exec(
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
te.msgTs.Height(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec restore event seen: %w", err)
}

rowsAffected, err = res.RowsAffected()
if err != nil {
return xerrors.Errorf("error getting rows affected: %s", err)
}
if rowsAffected != 1 {
log.Warnf("restored %d events in events_seen but expected only one to exist", rowsAffected)
}
}
eventCount++
}
}

// add an entry to the event_seen table for this tipset
_, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec(
te.msgTs.Height(),
tsKeyCid.Bytes(),
false,
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return xerrors.Errorf("exec insert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := ei.updateSubs
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- &EventIndexUpdate{
Height: uint64(te.msgTs.Height()),
TipsetCid: tsKeyCid,
Reverted: false,
}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand Down
Loading
Loading