-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: chain indexer todos [skip changelog] #12462
Changes from 3 commits
632c620
e84a794
1b57672
0d22bcf
8f2fa81
f0b4739
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,9 +57,10 @@ | |
} | ||
|
||
func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.ChainEpoch, backfill bool) (*types.IndexValidation, error) { | ||
if !si.started { | ||
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started") | ||
if !si.started && si.isClosed() { | ||
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started and not closed") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have different if statements for both conditions so the error message can be more granular. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
|
||
si.writerLk.Lock() | ||
defer si.writerLk.Unlock() | ||
|
||
|
@@ -126,17 +127,124 @@ | |
return nil, xerrors.Errorf("failed to cast tipset key cid: %w", err) | ||
} | ||
|
||
if indexedTsKeyCid != expectedTsKeyCid { | ||
if !indexedTsKeyCid.Equals(expectedTsKeyCid) { | ||
return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid) | ||
} | ||
|
||
// indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data | ||
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs.Key()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if err != nil { | ||
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ensure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, it will only be nil if error is there and if there is error and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but no harm in checking it and returning an error of it is nil here so user's node dosen't crash if there exists a bug that causes a nil value here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil { | ||
return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err) | ||
} | ||
|
||
return &types.IndexValidation{ | ||
TipsetKey: expectedTs.Key().String(), | ||
Height: uint64(expectedTs.Height()), | ||
// TODO Other fields | ||
TipsetKey: expectedTs.Key().String(), | ||
Height: uint64(expectedTs.Height()), | ||
TotalMessages: uint64(indexedData.nonRevertedMessageCount), | ||
TotalEvents: uint64(indexedData.nonRevertedEventCount), | ||
EventsReverted: indexedData.hasRevertedEvents, | ||
}, nil | ||
} | ||
|
||
type indexedTipSetData struct { | ||
nonRevertedMessageCount int | ||
nonRevertedEventCount int | ||
hasRevertedEvents bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a test for these fields to We already call this API there and so this should be easy. See
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test is failing right now because |
||
} | ||
|
||
// getIndexedTipSetData fetches the indexed tipset data for a tipset | ||
func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, tsKey types.TipSetKey) (*indexedTipSetData, error) { | ||
cid, err := tsKey.Cid() | ||
if err != nil { | ||
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err) | ||
} | ||
tsKeyBytes := cid.Bytes() | ||
|
||
var data indexedTipSetData | ||
err = withTx(ctx, si.db, func(tx *sql.Tx) error { | ||
if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedMessageCount); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to use a transaction here as you are only reading from the DB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was using transaction so I get the consistent view of DB for querying. |
||
return xerrors.Errorf("failed to query non reverted message count: %w", err) | ||
} | ||
|
||
if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.nonRevertedEventCount); err != nil { | ||
return xerrors.Errorf("failed to query non reverted event count: %w", err) | ||
} | ||
|
||
// we are only fetching non-reverted events, so if there are no non-reverted events, | ||
// then we need to check if the events were reverted. | ||
if data.nonRevertedEventCount == 0 { | ||
if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyBytes).Scan(&data.hasRevertedEvents); err != nil { | ||
return xerrors.Errorf("failed to check for reverted events: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
return &data, err | ||
} | ||
|
||
// verifyIndexedData verifies that the indexed data for a tipset is correct | ||
// by comparing the number of messages and events in the chainstore to the number of messages and events indexed. | ||
// NOTE: Events are loaded from the executed messages of the tipset at the next epoch (ts.Height() + 1). | ||
func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedData *indexedTipSetData) (err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I see why you want a struct for this, but it's still just 2 ints and it might even be clearer to provide them as named arguments here |
||
tsKeyCid, err := ts.Key().Cid() | ||
if err != nil { | ||
return xerrors.Errorf("failed to get tipset key cid: %w", err) | ||
} | ||
|
||
msgs, err := si.cs.MessagesForTipset(ctx, ts) | ||
if err != nil { | ||
return xerrors.Errorf("failed to get messages for tipset: %w", err) | ||
} | ||
|
||
msgCount := len(msgs) | ||
if msgCount != indexedData.nonRevertedMessageCount { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was about to ask this as well. So here we are getting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look at what the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checked, you're correct it's the same thing, I will update. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
return xerrors.Errorf("tipset message count mismatch: chainstore has %d, index has %d", msgCount, indexedData.nonRevertedMessageCount) | ||
} | ||
|
||
// get the tipset where the messages of `ts` will be executed (deferred execution) | ||
executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see discussion in #12447 (comment), same problem here I think, particularly as you get close to the tip of the chain There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rvagg Which is why we error out here if the parent of this tipset is not the original tipset. We can then ask the user to retry as that means that the chain has forked between the two tipsets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want to re-execute the tipset/regenerate events here. |
||
if err != nil { | ||
return xerrors.Errorf("failed to get tipset by height: %w", err) | ||
} | ||
|
||
eParentTsKeyCid, err := executionTs.Parents().Cid() | ||
if err != nil { | ||
return xerrors.Errorf("failed to get execution tipset parent key cid: %w", err) | ||
} | ||
|
||
// the parent tipset of the execution tipset should be the same as the indexed tipset (`ts` should be the parent of `executionTs`) | ||
if !eParentTsKeyCid.Equals(tsKeyCid) { | ||
return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s", eParentTsKeyCid, tsKeyCid) | ||
} | ||
|
||
executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs) | ||
if err != nil { | ||
return xerrors.Errorf("failed to load executed messages: %w", err) | ||
} | ||
|
||
totalEventsCount := 0 | ||
for _, emsg := range executedMsgs { | ||
totalEventsCount += len(emsg.evs) | ||
} | ||
|
||
if totalEventsCount != indexedData.nonRevertedEventCount { | ||
return xerrors.Errorf("tipset event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount) | ||
} | ||
|
||
totalExecutedMsgCount := len(executedMsgs) | ||
if totalExecutedMsgCount != int(indexedData.nonRevertedMessageCount) { | ||
return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) { | ||
if !backfill { | ||
return nil, xerrors.Errorf("missing tipset at height %d in the chain index, set backfill to true to backfill", ts.Height()) | ||
|
@@ -159,9 +267,17 @@ | |
return nil, xerrors.Errorf("error applying tipset: %w", err) | ||
} | ||
|
||
indexedData, err := si.getIndexedTipSetData(ctx, ts.Key()) | ||
if err != nil { | ||
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err) | ||
} | ||
|
||
return &types.IndexValidation{ | ||
TipsetKey: ts.Key().String(), | ||
Height: uint64(ts.Height()), | ||
Backfilled: true, | ||
TipsetKey: ts.Key().String(), | ||
Height: uint64(ts.Height()), | ||
Backfilled: true, | ||
TotalMessages: indexedData.nonRevertedMessageCount, | ||
Check failure on line 279 in chain/index/api.go
|
||
TotalEvents: indexedData.nonRevertedEventCount, | ||
Check failure on line 280 in chain/index/api.go
|
||
EventsReverted: indexedData.hasRevertedEvents, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @akaladarshi I think this variable will confuse users more than help them. Let's just get rid of it and simply have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean rename the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Let's remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then, I think we can also remove the query for checking, if the events were reverted. if data.nonRevertedEventCount == 0 {
if err = tx.Stmt(si.stmts.hasRevertedEventsStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.hasRevertedEvents); err != nil {
return xerrors.Errorf("failed to check for reverted events: %w", err)
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
}, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,5 +82,8 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { | |
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)", | ||
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0", | ||
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets", | ||
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0", | ||
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @akaladarshi What if the tipset is NOT reverted but the events are reverted ? This query only ensures that the tipset is NOT reverted. Does it have any implications on the correctness of the code /API ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you say There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be corruption if one were reverted and the other not? is that something we should also be checking for? |
||
&ps.hasRevertedEventsStmt: "SELECT EXISTS (SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))", | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,10 @@ type preparedStatements struct { | |
hasNullRoundAtHeightStmt *sql.Stmt | ||
getNonRevertedTipsetAtHeightStmt *sql.Stmt | ||
countTipsetsAtHeightStmt *sql.Stmt | ||
|
||
getNonRevertedTipsetMessageCountStmt *sql.Stmt | ||
getNonRevertedTipsetEventCountStmt *sql.Stmt | ||
hasRevertedEventsStmt *sql.Stmt | ||
} | ||
|
||
type SqliteIndexer struct { | ||
|
@@ -173,12 +177,9 @@ func (si *SqliteIndexer) initStatements() error { | |
} | ||
|
||
func (si *SqliteIndexer) IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, msgCid cid.Cid) error { | ||
si.closeLk.RLock() | ||
if si.closed { | ||
si.closeLk.RUnlock() | ||
if si.isClosed() { | ||
return ErrClosed | ||
} | ||
si.closeLk.RUnlock() | ||
|
||
return withTx(ctx, si.db, func(tx *sql.Tx) error { | ||
return si.indexEthTxHash(ctx, tx, txHash, msgCid) | ||
|
@@ -199,12 +200,10 @@ func (si *SqliteIndexer) IndexSignedMessage(ctx context.Context, msg *types.Sign | |
if msg.Signature.Type != crypto.SigTypeDelegated { | ||
return nil | ||
} | ||
si.closeLk.RLock() | ||
if si.closed { | ||
si.closeLk.RUnlock() | ||
|
||
if si.isClosed() { | ||
return ErrClosed | ||
} | ||
si.closeLk.RUnlock() | ||
|
||
return withTx(ctx, si.db, func(tx *sql.Tx) error { | ||
return si.indexSignedMessage(ctx, tx, msg) | ||
|
@@ -227,7 +226,7 @@ func (si *SqliteIndexer) indexSignedMessage(ctx context.Context, tx *sql.Tx, msg | |
|
||
func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) error { | ||
si.closeLk.RLock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. double lock here because you do it in |
||
if si.closed { | ||
if si.isClosed() { | ||
si.closeLk.RUnlock() | ||
return ErrClosed | ||
} | ||
|
@@ -346,12 +345,9 @@ func (si *SqliteIndexer) restoreTipsetIfExists(ctx context.Context, tx *sql.Tx, | |
} | ||
|
||
func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) error { | ||
si.closeLk.RLock() | ||
if si.closed { | ||
si.closeLk.RUnlock() | ||
if si.isClosed() { | ||
return ErrClosed | ||
} | ||
si.closeLk.RUnlock() | ||
|
||
si.writerLk.Lock() | ||
defer si.writerLk.Unlock() | ||
|
@@ -398,3 +394,9 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err | |
|
||
return nil | ||
} | ||
|
||
func (si *SqliteIndexer) isClosed() bool { | ||
si.closeLk.RLock() | ||
defer si.closeLk.RUnlock() | ||
return si.closed | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OR closed. Not AND, right ?