Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: events index: record processed epochs and tipsets for events and eth_get_log blocks till requested tipset has been indexed #12080

Merged
merged 20 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 241 additions & 7 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -61,9 +62,13 @@ var ddls = []string{
value BLOB NOT NULL
)`,

createTableEventsSeen,

createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKeyCid,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
Expand All @@ -75,20 +80,26 @@ var ddls = []string{
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (6)`,
}

var (
log = logging.Logger("filter")
)

const (
schemaVersion = 5
schemaVersion = 6

eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`
restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
Expand All @@ -98,6 +109,17 @@ const (
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL,
UNIQUE(height, tipset_key_cid)
)`

createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);`
rvagg marked this conversation as resolved.
Show resolved Hide resolved
)

type EventIndex struct {
Expand All @@ -108,8 +130,26 @@ type EventIndex struct {
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
stmtUpsertEventsSeen *sql.Stmt
stmtRevertEventSeen *sql.Stmt
stmtRestoreEventSeen *sql.Stmt

stmtIsTipsetProcessed *sql.Stmt
stmtGetMaxHeightInIndex *sql.Stmt

mu sync.Mutex
subIdCounter uint64
updateSubs map[uint64]*updateSub
}

type updateSub struct {
ctx context.Context
ch chan EventIndexUpdated
cancel context.CancelFunc
}

type EventIndexUpdated struct{}

func (ei *EventIndex) initStatements() (err error) {
ei.stmtEventExists, err = ei.db.Prepare(eventExists)
if err != nil {
Expand All @@ -136,6 +176,31 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}

ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err)
}

ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err)
}

ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen)
if err != nil {
return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err)
}

ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed)
if err != nil {
return xerrors.Errorf("prepare isTipsetProcessed: %w", err)
}

ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex)
if err != nil {
return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err)
}

return nil
}

Expand Down Expand Up @@ -401,9 +466,59 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
return xerrors.Errorf("commit transaction: %w", err)
}

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
return nil
}

func (ei *EventIndex) migrateToVersion6(ctx context.Context) error {
now := time.Now()

tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()

stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err)
}
_, err = stmtCreateTableEventsSeen.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("create table events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, createIndexEventsSeenHeight)
if err != nil {
return xerrors.Errorf("create index events_seen_height: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid)
if err != nil {
return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err)
}

// INSERT an entry in the events_seen table for all epochs we do have events for in our DB
_, err = tx.ExecContext(ctx, `
INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted)
SELECT DISTINCT height, tipset_key_cid, reverted FROM event
`)
if err != nil {
return xerrors.Errorf("insert events into events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)")
if err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.vacuumDBAndCheckpointWAL(ctx)
rvagg marked this conversation as resolved.
Show resolved Hide resolved

log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now))
return nil
}

Expand Down Expand Up @@ -498,6 +613,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
version = 5
}

if version == 5 {
log.Infof("Upgrading event index from version 5 to version 6")
err = eventIndex.migrateToVersion6(ctx)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err)
}
version = 6
}

if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand All @@ -510,6 +635,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err)
}

eventIndex.updateSubs = make(map[uint64]*updateSub)

return &eventIndex, nil
}

Expand All @@ -520,6 +647,61 @@ func (ei *EventIndex) Close() error {
return ei.db.Close()
}

func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) {
subCtx, subCancel := context.WithCancel(context.Background())
ch := make(chan EventIndexUpdated)

tSub := &updateSub{
ctx: subCtx,
cancel: subCancel,
ch: ch,
}

ei.mu.Lock()
subId := ei.subIdCounter
ei.subIdCounter++
ei.updateSubs[subId] = tSub
ei.mu.Unlock()

unSubscribeF := func() {
ei.mu.Lock()
tSub, ok := ei.updateSubs[subId]
if !ok {
ei.mu.Unlock()
return
}
delete(ei.updateSubs, subId)
ei.mu.Unlock()

// cancel the subscription
tSub.cancel()
}

return tSub.ch, unSubscribeF
}

func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx)
var maxHeight uint64
err := row.Scan(&maxHeight)
return maxHeight, err
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
mh, err := ei.GetMaxHeightInIndex(ctx)
if err != nil {
return false, err
}
return height <= mh, nil
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid)
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.Begin()
if err != nil {
Expand All @@ -528,18 +710,46 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// rollback the transaction (a no-op if the transaction was already committed)
defer func() { _ = tx.Rollback() }()

tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return
if revert {
_, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes())
if err != nil {
return xerrors.Errorf("revert event: %w", err)
}

_, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes())
if err != nil {
return xerrors.Errorf("revert event seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand Down Expand Up @@ -567,11 +777,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
addressLookups[ev.Emitter] = addr
}

tsKeyCid, err := te.msgTs.Key().Cid()
rvagg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
Expand Down Expand Up @@ -651,11 +856,40 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
}

// this statement will mark the tipset as processed and will insert a new row if it doesn't exist
// or update the reverted field to false if it does
_, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec(
te.msgTs.Height(),
tsKeyCid.Bytes(),
false,
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return xerrors.Errorf("exec upsert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.mu.Lock()
tSubs := make([]*updateSub, 0, len(ei.updateSubs))
for _, tSub := range ei.updateSubs {
tSubs = append(tSubs, tSub)
}
ei.mu.Unlock()

for _, tSub := range tSubs {
tSub := tSub
select {
case tSub.ch <- EventIndexUpdated{}:
case <-tSub.ctx.Done():
// subscription was cancelled, ignore
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

Expand Down
Loading
Loading