Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a new ChainIndexer to index tipsets, messages and events #12421

Merged
merged 128 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
128 commits
Select commit Hold shift + click to select a range
fb19d79
chain index complete for msgs and txns
aarshkshah1992 Aug 29, 2024
58569d6
dont need observer changes for now
aarshkshah1992 Aug 29, 2024
5a3f76f
changes
aarshkshah1992 Aug 29, 2024
9ea48f3
fix tests
aarshkshah1992 Aug 29, 2024
1e3a9d5
fix tests
aarshkshah1992 Aug 29, 2024
4c34bc7
use th right context
aarshkshah1992 Aug 29, 2024
285ce26
index empty tipsets correctly
aarshkshah1992 Aug 30, 2024
12e67fe
implement automated backfilling
aarshkshah1992 Aug 30, 2024
3377987
add event indexing and remove all old indices
aarshkshah1992 Sep 4, 2024
e6331da
fix test
aarshkshah1992 Sep 4, 2024
f1f24c8
revert deployment test changes
aarshkshah1992 Sep 5, 2024
3f09e1e
revert test changes and better error handling for eth tx index lookups
aarshkshah1992 Sep 5, 2024
c3865b7
fix sql statments naming convention
aarshkshah1992 Sep 5, 2024
8ec7cd2
address review for Index GC
aarshkshah1992 Sep 5, 2024
a1c5201
more changes as per review
aarshkshah1992 Sep 5, 2024
e0831ee
changes as per review
aarshkshah1992 Sep 5, 2024
06ca87a
fix config
aarshkshah1992 Sep 5, 2024
c30079e
mark events as reverted during reconciliation
aarshkshah1992 Sep 5, 2024
01d78e3
better reconciliation; pens down and code complete; also reconcile ev…
aarshkshah1992 Sep 6, 2024
994a717
fix tests
aarshkshah1992 Sep 6, 2024
ad9bcb1
improve config and docs
aarshkshah1992 Sep 6, 2024
0c9a0ca
improve docs and error handling
aarshkshah1992 Sep 6, 2024
89cedb2
improve read logic
aarshkshah1992 Sep 6, 2024
a2a2b76
improve docs
aarshkshah1992 Sep 6, 2024
44af9f8
better logging and handle ennable event storage
aarshkshah1992 Sep 9, 2024
6608b80
improve logs and index init proc
aarshkshah1992 Sep 9, 2024
625d8c8
better logging
aarshkshah1992 Sep 9, 2024
fed08b0
fix bugs based on calibnet testing
aarshkshah1992 Sep 9, 2024
a5c56c1
create sqliite Indices
aarshkshah1992 Sep 10, 2024
7acd481
gc should be based on epochs
aarshkshah1992 Sep 10, 2024
821dcd4
fix event query
aarshkshah1992 Sep 10, 2024
cde46cb
foreign keys should be enabled on the DB
aarshkshah1992 Sep 10, 2024
727dae3
reverted tipsets should be removed as part of GC
aarshkshah1992 Sep 10, 2024
c07784d
release read lock
aarshkshah1992 Sep 10, 2024
896048a
make it easy to backfill an empty index using reconciliation
aarshkshah1992 Sep 10, 2024
602f660
better docs for reconciliation
aarshkshah1992 Sep 10, 2024
9750571
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Sep 12, 2024
13c2824
fix conflicts with master
aarshkshah1992 Sep 12, 2024
37d6746
Apply suggestions from code review
aarshkshah1992 Sep 13, 2024
c4490bb
fix go mod
aarshkshah1992 Sep 13, 2024
93a8b76
fix formatting
aarshkshah1992 Sep 13, 2024
6f8530e
revert config changes
aarshkshah1992 Sep 13, 2024
627aff2
address changes in observer
aarshkshah1992 Sep 13, 2024
7244b66
remove top level chainindex package
aarshkshah1992 Sep 13, 2024
531cd38
changes as per review
aarshkshah1992 Sep 13, 2024
77fc462
changes as per review
aarshkshah1992 Sep 13, 2024
c2e5f68
changes as per review
aarshkshah1992 Sep 13, 2024
286af22
handle index with reverted tipsets during reconciliation
aarshkshah1992 Sep 13, 2024
d67a30a
changes as per review
aarshkshah1992 Sep 13, 2024
5f5ef3a
fix type of max reconcile epoch
aarshkshah1992 Sep 13, 2024
f5a5c61
changes to reconciliation as per review
aarshkshah1992 Sep 14, 2024
730d00a
log ipld error
aarshkshah1992 Sep 14, 2024
c099abf
better logging of progress
aarshkshah1992 Sep 14, 2024
951ce77
disable chain indexer hydrate from snapshot based on config
aarshkshah1992 Sep 14, 2024
ad6c086
always populate index
aarshkshah1992 Sep 14, 2024
52e104d
make config easy to reason about
aarshkshah1992 Sep 14, 2024
9c6c728
fix config
aarshkshah1992 Sep 14, 2024
1da1e07
fix messaging
aarshkshah1992 Sep 14, 2024
efe90f8
revert config changes
aarshkshah1992 Sep 14, 2024
c121321
Apply suggestions from code review
aarshkshah1992 Sep 16, 2024
2ff1d42
changes as per review
aarshkshah1992 Sep 16, 2024
c945bb5
make error messages homogenous
aarshkshah1992 Sep 16, 2024
432e09a
fix indentation
aarshkshah1992 Sep 16, 2024
af9bc23
changes as per review
aarshkshah1992 Sep 16, 2024
6d84b03
feat: recompute tipset to generate missing events if event indexing i…
aarshkshah1992 Sep 16, 2024
b9f1583
better docs for gc retention epoch
aarshkshah1992 Sep 16, 2024
1921abd
imrpove DB handling (#12485)
aarshkshah1992 Sep 19, 2024
c03c4c7
Merge branch 'master' into feat/msg-eth-tx-index
aarshkshah1992 Sep 30, 2024
b7966e3
fix conflict
aarshkshah1992 Sep 30, 2024
da76042
fix lite node config for indexer
aarshkshah1992 Oct 1, 2024
1e8f8ce
exclude reverted events from eth get logs if client queries by epoch
aarshkshah1992 Oct 11, 2024
7b304a4
Simply addressing for event lookups in the index.
aarshkshah1992 Oct 11, 2024
5e2dba4
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Oct 14, 2024
593e724
Apply suggestions from code review
aarshkshah1992 Oct 14, 2024
aa2aa4c
Apply suggestions from code review
aarshkshah1992 Oct 14, 2024
0339c46
fix tests
aarshkshah1992 Oct 14, 2024
105d85b
Apply suggestions from code review
aarshkshah1992 Oct 14, 2024
a9788f5
feat: migration("re-indexing"), backfilling and diasgnostics tooling …
aarshkshah1992 Oct 14, 2024
fcb6012
fix lint
aarshkshah1992 Oct 14, 2024
e3be131
Apply suggestions from code review
aarshkshah1992 Oct 15, 2024
8f0ee3c
Apply suggestions from code review
aarshkshah1992 Oct 15, 2024
04ac694
remove reverted flag from RPC
aarshkshah1992 Oct 15, 2024
f92b03c
Apply suggestions from code review
aarshkshah1992 Oct 15, 2024
5b58fe5
fix testing of events and dummy chain store
aarshkshah1992 Oct 15, 2024
86a8d26
remove lotus shed commands for old Indices
aarshkshah1992 Oct 15, 2024
aff1a6c
change type of event counts to uint64
aarshkshah1992 Oct 15, 2024
fe76994
only recompute events if theyre not found
aarshkshah1992 Oct 15, 2024
361d7bf
short-circuit empty events path for older tipsets
aarshkshah1992 Oct 15, 2024
f628cb2
chain indexer must be enabled if ETH RPC is enabled
aarshkshah1992 Oct 15, 2024
c96f4ed
change name of message_id column to id in tipset_message table
aarshkshah1992 Oct 15, 2024
cb7150c
only expose SetRecomputeTipSetStateFunc
aarshkshah1992 Oct 16, 2024
5575a1d
dont block on head indexing for reading messages
aarshkshah1992 Oct 16, 2024
e2013a0
document why we're only checking for missing events for a single tipset
aarshkshah1992 Oct 16, 2024
fcda11b
document when we query for reverted events
aarshkshah1992 Oct 16, 2024
ee6e589
simplify event collection
aarshkshah1992 Oct 16, 2024
8aad8e4
Apply suggestions from code review
aarshkshah1992 Oct 16, 2024
ab4de90
fix test
aarshkshah1992 Oct 16, 2024
bc97ebe
change event_id to id in the event table
aarshkshah1992 Oct 16, 2024
06e139f
change head indexed timeout
aarshkshah1992 Oct 16, 2024
947e6d1
remove deprecated config options
aarshkshah1992 Oct 16, 2024
5887cdf
fail ETH RPC calls if ChainIndexer is disabled
aarshkshah1992 Oct 16, 2024
03627aa
fix docs
aarshkshah1992 Oct 16, 2024
b4939ae
remove the tipset key cid func from lotus shed
aarshkshah1992 Oct 16, 2024
091574c
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Oct 16, 2024
54d6632
address review comments
aarshkshah1992 Oct 16, 2024
1135da7
Apply suggestions from code review
aarshkshah1992 Oct 17, 2024
6dd3fcc
chore(events): remove unnecessary DisableRealTimeFilterAPI (#12610)
rvagg Oct 17, 2024
16123bc
feat(cli): add --quiet to chainindex validate-backfill + cleanups (#1…
rvagg Oct 17, 2024
488202b
fix tests
aarshkshah1992 Oct 17, 2024
8e6229e
Apply suggestions from code review
aarshkshah1992 Oct 17, 2024
5836ccb
error type for disabled chainindexer
aarshkshah1992 Oct 17, 2024
d0fb6e2
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Oct 17, 2024
70aaef4
fix(chainindex): recompute tipset when we find no receipts
rvagg Oct 18, 2024
702f1f5
fix(chainindexer): backfilling should halt when chain state data is m…
aarshkshah1992 Oct 21, 2024
bd72eec
reduce log noise
aarshkshah1992 Oct 21, 2024
c3937d7
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Oct 21, 2024
0311749
make jen
aarshkshah1992 Oct 21, 2024
fe16f30
make jen
aarshkshah1992 Oct 21, 2024
4bdce5f
docs: finishing chain-indexer-overview-for-operators.md (#12600)
BigLep Oct 22, 2024
599c2a9
remove go mod replace
aarshkshah1992 Oct 22, 2024
e84aa38
remove unnecessary changes from CHANGELOG
aarshkshah1992 Oct 22, 2024
24698c5
fix test
aarshkshah1992 Oct 22, 2024
4b6a582
compare events AMT root (#12632)
aarshkshah1992 Oct 23, 2024
5489ae9
fix(chainindex): retry transaction if database connection is lost (#1…
aarshkshah1992 Oct 29, 2024
cd1778c
Merge branch 'master' of github.com:filecoin-project/lotus into feat/…
aarshkshah1992 Oct 31, 2024
55bbe7a
fix gateway itest: no chainindexer for lite nodes
aarshkshah1992 Oct 31, 2024
16575e3
fix changelog
aarshkshah1992 Oct 31, 2024
a0e97c9
Merge branch 'master' into feat/msg-eth-tx-index
aarshkshah1992 Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions chain/events/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@
return nil
}

// ObserveAndBlock registers the observer and returns the current tipset along with a handle function.
// The observer is guaranteed to observe events starting at this tipset.
// The returned handle function should be called by the client when it's ready to receive updates.
//
// This function should only be called by the client after the observer has been started.
// Note that the Observer will block all clients from recieving tipset updates until the handle is called.

Check failure on line 252 in chain/events/observer.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

`recieving` is a misspelling of `receiving` (misspell)
func (o *observer) ObserveAndBlock(obs TipSetObserver) (*types.TipSet, func()) {
o.lk.Lock()
o.observers = append(o.observers, obs)
currentHead := o.head

unlockHandle := func() {
o.lk.Unlock()
}

return currentHead, unlockHandle
}

// Observe registers the observer, and returns the current tipset. The observer is guaranteed to
// observe events starting at this tipset.
//
Expand Down
4 changes: 3 additions & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/chainindex"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
Expand Down Expand Up @@ -258,7 +259,8 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
//}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds, index.DummyMsgIndex)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds,
index.DummyMsgIndex, chainindex.DummyIndexer)
if err != nil {
return nil, xerrors.Errorf("initing stmgr: %w", err)
}
Expand Down
18 changes: 4 additions & 14 deletions chain/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,26 @@ import (

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chainindex"
)

var ErrNotFound = errors.New("message not found")
var ErrClosed = errors.New("index closed")

// MsgInfo is the Message metadata the index tracks.
type MsgInfo struct {
// the message this record refers to
Message cid.Cid
// the tipset where this message was included
TipSet cid.Cid
// the epoch where this message was included
Epoch abi.ChainEpoch
}

// MsgIndex is the interface to the message index
type MsgIndex interface {
// GetMsgInfo retrieves the message metadata through the index.
// The lookup is done using the onchain message Cid; that is the signed message Cid
// for SECP messages and unsigned message Cid for BLS messages.
GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error)
GetMsgInfo(ctx context.Context, m cid.Cid) (*chainindex.MsgInfo, error)
// Close closes the index
Close() error
}

type dummyMsgIndex struct{}

func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
return MsgInfo{}, ErrNotFound
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (*chainindex.MsgInfo, error) {
return nil, ErrNotFound
}

func (dummyMsgIndex) Close() error {
Expand Down
31 changes: 17 additions & 14 deletions chain/index/msgindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chainindex"
"github.com/filecoin-project/lotus/lib/sqlite"
)

Expand Down Expand Up @@ -88,7 +89,7 @@ type headChange struct {
app []*types.TipSet
}

func NewMsgIndex(lctx context.Context, path string, cs ChainStore) (MsgIndex, error) {
func NewMsgIndex(lctx context.Context, path string, cs ChainStore, enableWrites bool) (MsgIndex, error) {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
db, exists, err := sqlite.Open(path)
if err != nil {
return nil, xerrors.Errorf("failed to setup message index db: %w", err)
Expand Down Expand Up @@ -124,13 +125,15 @@ func NewMsgIndex(lctx context.Context, path string, cs ChainStore) (MsgIndex, er
return nil, xerrors.Errorf("error preparing msgindex database statements: %w", err)
}

rnf := store.WrapHeadChangeCoalescer(
msgIndex.onHeadChange,
CoalesceMinDelay,
CoalesceMaxDelay,
CoalesceMergeInterval,
)
cs.SubscribeHeadChanges(rnf)
if enableWrites {
rnf := store.WrapHeadChangeCoalescer(
msgIndex.onHeadChange,
CoalesceMinDelay,
CoalesceMaxDelay,
CoalesceMergeInterval,
)
cs.SubscribeHeadChanges(rnf)
}

msgIndex.workers.Add(1)
go msgIndex.background(ctx)
Expand Down Expand Up @@ -431,12 +434,12 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
}

// interface
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (*chainindex.MsgInfo, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()

if x.closed {
return MsgInfo{}, ErrClosed
return nil, ErrClosed
}

var (
Expand All @@ -449,18 +452,18 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
err := row.Scan(&tipset, &epoch)
switch {
case err == sql.ErrNoRows:
return MsgInfo{}, ErrNotFound
return nil, ErrNotFound

case err != nil:
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
return nil, xerrors.Errorf("error querying msgindex database: %w", err)
}

tipsetCid, err := cid.Decode(tipset)
if err != nil {
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
return nil, xerrors.Errorf("error decoding tipset cid: %w", err)
}

return MsgInfo{
return &chainindex.MsgInfo{
Message: m,
TipSet: tipsetCid,
Epoch: abi.ChainEpoch(epoch),
Expand Down
8 changes: 4 additions & 4 deletions chain/index/msgindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBasicMsgIndex(t *testing.T) {
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })

msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs)
msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs, true)
require.NoError(t, err)

defer msgIndex.Close() //nolint
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestReorgMsgIndex(t *testing.T) {
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })

msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs)
msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs, true)
require.NoError(t, err)

defer msgIndex.Close() //nolint
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestReconcileMsgIndex(t *testing.T) {
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })

msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs)
msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs, true)
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand All @@ -130,7 +130,7 @@ func TestReconcileMsgIndex(t *testing.T) {
require.NoError(t, err)

// reopen to reconcile
msgIndex, err = NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs)
msgIndex, err = NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs, true)
require.NoError(t, err)

defer msgIndex.Close() //nolint
Expand Down
10 changes: 7 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
. "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chainindex"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
)
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestForkHeightTriggers(t *testing.T) {
}

return st.Flush(ctx)
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex, chainindex.DummyIndexer)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
migrationCount++
return root, nil
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex, chainindex.DummyIndexer)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -519,7 +520,7 @@ func TestForkPreMigration(t *testing.T) {
return nil
},
}}},
}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex, chainindex.DummyIndexer)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -595,6 +596,7 @@ func TestDisablePreMigration(t *testing.T) {
cg.BeaconSchedule(),
datastore.NewMapDatastore(),
index.DummyMsgIndex,
chainindex.DummyIndexer,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
Expand Down Expand Up @@ -650,6 +652,7 @@ func TestMigrtionCache(t *testing.T) {
cg.BeaconSchedule(),
metadataDs,
index.DummyMsgIndex,
chainindex.DummyIndexer,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
Expand Down Expand Up @@ -703,6 +706,7 @@ func TestMigrtionCache(t *testing.T) {
cg.BeaconSchedule(),
metadataDs,
index.DummyMsgIndex,
chainindex.DummyIndexer,
)
require.NoError(t, err)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
Expand Down
8 changes: 6 additions & 2 deletions chain/stmgr/searchwait.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
}

func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
minfo, err := sm.chainIndexer.GetMsgInfo(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err)
// If chainIndexer fails, fallback to msgIndex
minfo, err = sm.msgIndex.GetMsgInfo(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in indexes: %w", err)
}
}

// check the height against the current tipset; minimum execution confidence requires that the
Expand Down
12 changes: 8 additions & 4 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chainindex"

// Used for genesis.
msig0 "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
Expand Down Expand Up @@ -156,7 +157,8 @@ type StateManager struct {
tsExecMonitor ExecMonitor
beacon beacon.Schedule

msgIndex index.MsgIndex
msgIndex index.MsgIndex
chainIndexer chainindex.Indexer

// We keep a small cache for calls to ExecutionTrace which helps improve
// performance for node operators like exchanges and block explorers
Expand All @@ -177,7 +179,8 @@ type tipSetCacheEntry struct {
invocTrace []*api.InvocResult
}

func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule,
metadataDs dstore.Batching, msgIndex index.MsgIndex, chainIndexer chainindex.Indexer) (*StateManager, error) {
// If we have upgrades, make sure they're in-order and make sense.
if err := us.Validate(); err != nil {
return nil, err
Expand Down Expand Up @@ -243,12 +246,13 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
},
compWait: make(map[string]chan struct{}),
msgIndex: msgIndex,
chainIndexer: chainIndexer,
execTraceCache: execTraceCache,
}, nil
}

func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex)
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching, msgIndex index.MsgIndex, chainIndexer chainindex.Indexer) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex, chainIndexer)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chainindex"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -216,7 +217,8 @@ func TestChainExportImportFull(t *testing.T) {
t.Fatal("imported chain differed from exported chain")
}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds, index.DummyMsgIndex)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(),
ds, index.DummyMsgIndex, chainindex.DummyIndexer)
if err != nil {
t.Fatal(err)
}
Expand Down
55 changes: 55 additions & 0 deletions chainindex/ddls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package chainindex
rvagg marked this conversation as resolved.
Show resolved Hide resolved

const DefaultDbFilename = "chainindex.db"

const (
rvagg marked this conversation as resolved.
Show resolved Hide resolved
stmtGetNonRevertedMessageInfo = "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0"
stmtGetMsgCidFromEthHash = "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ?"
stmtInsertEthTxHash = "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP"

stmtInsertTipsetMessage = "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0"

stmtTipsetExists = "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)"
stmtTipsetUnRevert = "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?"

stmtRevertTipset = "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?"

stmtGetMaxNonRevertedTipset = "SELECT tipset_key_cid FROM tipset_message WHERE reverted = 0 ORDER BY height DESC LIMIT 1"

stmtRemoveRevertedTipsetsBeforeHeight = "DELETE FROM tipset_message WHERE height < ? AND reverted = 1"
stmtRemoveTipsetsBeforeHeight = "DELETE FROM tipset_message WHERE height < ?"

stmtDeleteEthHashesOlderThan = `DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?);`

stmtRevertTipsetsFromHeight = "UPDATE tipset_message SET reverted = 1 WHERE height >= ?"

stmtCountMessages = "SELECT COUNT(*) FROM tipset_message"

stmtMinNonRevertedHeight = `SELECT MIN(height) FROM tipset_message WHERE reverted = 0`
rvagg marked this conversation as resolved.
Show resolved Hide resolved

stmtTipsetExistsNotReverted = `SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)`
)

var ddls = []string{
`CREATE TABLE IF NOT EXISTS tipset_message (
message_id INTEGER PRIMARY KEY,
rvagg marked this conversation as resolved.
Show resolved Hide resolved
tipset_key_cid BLOB NOT NULL,
height INTEGER NOT NULL,
rvagg marked this conversation as resolved.
Show resolved Hide resolved
reverted INTEGER NOT NULL,
message_cid BLOB,
rvagg marked this conversation as resolved.
Show resolved Hide resolved
message_index INTEGER,
UNIQUE (tipset_key_cid, message_cid)
)`,

`CREATE TABLE IF NOT EXISTS eth_tx_hash (
tx_hash TEXT PRIMARY KEY,
message_cid BLOB NOT NULL,
inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)`,

`CREATE INDEX IF NOT EXISTS insertion_time_index ON eth_tx_hash (inserted_at)`,

`CREATE INDEX IF NOT EXISTS idx_message_cid ON tipset_message (message_cid)`,

`CREATE INDEX IF NOT EXISTS idx_tipset_key_cid ON tipset_message (tipset_key_cid)`,
}
Loading
Loading