-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] A new ChainIndexer
that replaces the fragmented MsgIndex, EthTxHashIndex and EventsIndex
#12388
Conversation
@@ -23,15 +23,6 @@ const DefaultDbFilename = "msgindex.db" | |||
|
|||
var log = logging.Logger("msgindex") | |||
|
|||
var ddls = []string{ | |||
`CREATE TABLE IF NOT EXISTS messages ( | |||
cid VARCHAR(80) PRIMARY KEY ON CONFLICT REPLACE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will completely remove the msgIndex once this PR lands.
event_index INTEGER NOT NULL, | ||
emitter_addr BLOB NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space/tab issue here btw
if err != nil { | ||
return xerrors.Errorf("error unreverting events for tipset: %w", err) | ||
} | ||
if rows > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so we're using this as our indicator of whether we've seen these events before and won't bother checking individual events like we do now, this seems sensible I think and avoids overhead, although it has more confidence that we have a consistent state!
|
||
if err == sql.ErrNoRows { | ||
// wait till head is processed and retry | ||
if err := ci.waitTillHeadIndexed(ctx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The msg could be in the mpool, right? that's about the only other place that legitimate messages might be. This call will block for up to 30s, is that a reasonable thing to do in this case? Shouldn't we just bail and say "not found" earlier under the assumption that we'll pick it up in the mpool (when you eventually implement that)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rvagg Eventually, mpool msgs will also be in this DB once I wire it up. This is to handle the case where user is asking for something in the HEAD.
return ems, nil | ||
} | ||
|
||
func (ci *ChainIndexer) GetMsgInfo(ctx context.Context, msg_cid cid.Cid) (MsgInfo, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully the linter is going to have issues with your underscore in msg_cid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, trying out some new tooling. Will fix.
} | ||
|
||
func (ci *ChainIndexer) waitTillHeadIndexed(ctx context.Context) error { | ||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just use EpochDurationSeconds
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rvagg Honestly, this has nothing to do with an epoch. I think we can change the timeout to 5seconds. The idea is that the Index should reflect the heaviest block we have in our statestore that's it. We're not waiting for a new block to come in here.
The "waitTill" up to 30s concerns me a little in these APIs. I think the problem we're trying to solve is the async updating of the index, correct? So we might be querying something that's just happened but it hasn't propagated to the db. But these waits could be up to 30s to find out if something just doesn't exist. I wonder if there's an alternative strategy here in checking with the status of the chain synchronously. We could do something like:
Then we deal with the async updating but only have to wait until what has already been processed has propagated. Does that work? |
@rvagg That is the plan and what I'm trying to do here i.e. ensure that the Index has Indexed the current heaviest tipset in the chainstore before retrying a read if the first read attempt fails. We don't need to wait for a new block to come in here. I will update the code/docs/timeout to make this more clearer. |
reverted INTEGER NOT NULL, | ||
message_cid BLOB NOT NULL, | ||
message_index INTEGER NOT NULL, | ||
events_processed INTEGER NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the purpose of this field btw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rvagg Eventually, want the event read API to differentiate between "events processed and no events" vs "yet to process events". This is a simple denormalised field that will help with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a tipset T, events in T will be processed when we index a tipset U such that Parent(U)=T i.e. when tipset T is first "executed" and it's execution tipset is indexed. This field just makes it easy to track that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but do we use this anywhere? I'm not seeing it, what's the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rvagg Yeah lemme add a "ReadEventsAPI" to show how it'll be used and wire it into the EventFilterManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR was to show the general direction I am taking. Next one will be code complete.
ChainIndexer
that replaces the fragmented MsgIndex, EthTxHashIndex and EventsIndex
Maybe we should allow the [WIP] prefix in PR-titles as well, so that this PR-checker is not so noisy? |
@Stebalien In case you feel like taking a look. |
@rjan90 : I think ideally we'd allow [WIP] in the title when the PR is in draft but then have a tighter check when it's out of draft? |
Thanks for the awesome writeup @aarshkshah1992 about where this is going and what is remaining. Very thorough but easy to understand. Good stuff! I'm excited to see this land. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the PR title to match https://github.com/filecoin-project/lotus/blob/master/CONTRIBUTING.md#pr-title-conventions
@github-actions No sorry, cannot bro ❤️ |
This PR implements a new
ChainIndexer
in Lotus that indexes tipsets, messages, transaction hashes, and events by consuming theChainNotify
API.It aims to replace and subsume the existing
MsgIndex
,EventsIndex
, andEthTxHashIndex
, which are currently fragmented across multiple databases and have several known issues documented in filecoin-project/lotus#12293.Key Features
The
ChainIndexer
offers the following key features:Here are the next implementation steps going ahead to get this PR in a ready for review/ready for testing state.
Switch RPC APIs to use the Chain Index
ChainIndexer
instead of theMsgIndex
,EthTxHashIndex
andEventsIndex
.EventFilterManager
will read events from theChainIndexer
and prefill all registered filters rather than depending on the Indexer to do the pre-filling of filters.ChainIndexer
will listen to Mpool message addition updates to index the corresponding ETH Tx Hash. TheEthTxHashManager
will no longer be used for this.Read APIs Should Account for the Async Nature of Indexing
T
only indexes events inT-1
because of deferred execution.ETH RPC APIs Should Only Expose Executed Tipsets and Messages
T
are executed in tipsetT + 1
.T
are also executed in tipsetT
.Removing Re-orged Tipsets That Are No Longer Part of the Canonical Chain
ChainIndexer
will periodically prune all permanently re-orged/reverted tipsets from the index. It can do this by simply pruning all tipsets at a height less than(current head - finality policy - some buffer)
.Garbage Collection
ChainIndexer
can perform periodic GC based on this configuration.ChainIndexer
because of the use ofFOREIGN KEY ON CASCADE DELETES
, as described in SQLite Foreign Keys.Snapshot Hydration
Automated Backfilling
ChainIndex
for which the corresponding state exists in the statestore.Observer
with that tipset as the current head.Observer
.ChainIndexer
will observe the(Apply, Revert)
path between its last non-reverted indexed tipset and the current heaviest tipset in the chainstore before processing real-time updates, effectively performing automated backfilling.Simplify Indexing Config
Migration from Old Indices to the New ChainIndex
lotus-shed
utility that allows users to migrate existing indices to the newChainIndexer
database. This command should only be executed when the Lotus node is offline to ensure data consistency and avoid potential conflicts.ChainIndexer
. This approach offers several benefits:Solid Unit Tests, itests and testing on a calibnet node