Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chain ranged export #9192

Closed
wants to merge 9 commits into from
Closed

Chain ranged export #9192

wants to merge 9 commits into from

Conversation

frrist
Copy link
Member

@frrist frrist commented Aug 20, 2022

Related Issues

Proposed Changes

Additional Info

Checklist

Before you mark the PR ready for review, please make sure that:

  • All commits have a clear commit message.
  • The PR title is in the form of of <PR type>: <area>: <change being made>
    • example: fix: mempool: Introduce a cache for valid signatures
    • PR type: fix, feat, INTERFACE BREAKING CHANGE, CONSENSUS BREAKING, build, chore, ci, docs,perf, refactor, revert, style, test
    • area: api, chain, state, vm, data transfer, market, mempool, message, block production, multisig, networking, paychan, proving, sealing, wallet, deps
  • This PR has tests for new functionality or change in behaviour
  • If new user-facing features are introduced, clear usage guidelines and / or documentation updates should be included in https://lotus.filecoin.io or Discussion Tutorials.
  • CI is green

@frrist frrist requested a review from travisperson September 8, 2022 15:09
@frrist frrist self-assigned this Sep 23, 2022
Dag
)

func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *walkResult) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs review

@@ -587,6 +590,80 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M
return cm.VMMessage(), nil
}

func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file needs review

@@ -24,6 +28,35 @@ func (cs *ChainStore) UnionStore() bstore.Blockstore {
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
}

func (cs *ChainStore) ExportRange(ctx context.Context, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cacheSize int, w io.Writer) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure io.Writer is buffered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cfg *walkSchedulerConfig
}

func (s *walkScheduler) enqueueIfNew(task *walkTask) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pass pointer?

close(s.in)
}()
for {
if n := len(s.stack) - 1; n >= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: this does depth traversal (using the stack). The two channels (in out), we assume, add backpressure so that stack couldn't grow indifinetely if the disk is not fast enough (not completely sure).

Doing breadth-first search might allow not using the stack, and directly relying on the in channel as FIFO queue.

Dag
)

func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *walkResult) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try avoid pointers for walkTas and walkResult.

}

pw, ctx := newWalkScheduler(ctx, store, cfg, tasks...)
results := make(chan *walkResult)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make me a good channel buffer on this shit forrest

results <- &walkResult{c: todo.c}

// extract relevant dags to walk from the block
if todo.taskType == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write: Block

log.Infow("block export", "height", b.Height)
}
if b.Height == 0 {
log.Infow("GENESIS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps exporting including height 0 crashes... prob not related to this part of the code.

pw.startScheduler(ctx)
pw.startWorkers(ctx, results)

done := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used


done := make(chan struct{})
var cbErr error
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential goroutine leak if things error somewhere, as close(results) is down below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 180 to 249
defer func() {
log.Infow("walkScheduler shutting down")
close(s.out)
// Because the workers may have exited early (due to the context being canceled).
for range s.out {
s.taskWg.Done()
}
// Because the workers may have enqueued additional tasks.
for range s.in {
s.taskWg.Done()
}
// now, the waitgroup should be at 0, and the goroutine that was _waiting_ on it should have exited.
log.Infow("walkScheduler stopped")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to re-approach synchronization here. Seems last info message never happens. Things hang on ctrl-c.

@hsanjuan
Copy link
Contributor

At the end of the export the following is printer:

      INFO    chainstore      store/snapshot.go:283   block export    {"height": "897000"}
2022-10-26T16:07:50.734Z        INFO    chainstore      store/snapshot.go:283   block export    {"height": "897000"}
2022-10-26T16:07:50.735Z        INFO    chainstore      store/snapshot.go:283   block export    {"height": "897000"}
2022-10-26T16:07:50.735Z        INFO    chainstore      store/snapshot.go:283   block export    {"height": "897000"}
2022-10-26T16:07:50.735Z        INFO    chainstore      store/snapshot.go:283   block export    {"height": "897000"}
....

where height goes down to 0. That is very confusing since we are only doing a range export. Are we missing a check to finish on the starting height?

}
if b.Height%1000 == 0 {
if b.Height%1_000 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm....what does this mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emit a log message every 1000 blocks
the underscore is serving as a comma for readability https://go.dev/doc/go1.13#language

@travisperson travisperson marked this pull request as ready for review November 29, 2022 18:03
@travisperson travisperson requested a review from a team as a code owner November 29, 2022 18:03
@frrist
Copy link
Member Author

frrist commented Dec 1, 2022

One outstanding issue with this PR is that it sometimes cannot be stopped with Ctrl+C

@jennijuju jennijuju force-pushed the frrist/chain-export-range branch from ad6ef57 to 0193c97 Compare December 2, 2022 03:03
@jennijuju jennijuju added the P2 P2: Should be resolved label Dec 8, 2022
@jennijuju
Copy link
Member

We need this before nv18 for stable snapshot service

@@ -171,6 +171,10 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read

ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) (<-chan []byte, error) //perm:read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filecoin-project/go-jsonrpc#88 adds support for calling methods on the client.

A callback accepting bytes on the client would be better as that lets you have backpressure, and is arguably more robust in general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see https://github.com/filecoin-project/lotus/pull/10027/files for an example of how it's integrated in lotus)

@hsanjuan hsanjuan mentioned this pull request Jan 30, 2023
7 tasks
@hsanjuan
Copy link
Contributor

I'm continuing this on #10145 . I'm leaving this branch untouched since we are actually using it in production now (well, a different rebase on v0.19.0).

@hsanjuan hsanjuan closed this Jan 30, 2023
@rjan90 rjan90 deleted the frrist/chain-export-range branch February 3, 2025 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P2 P2: Should be resolved
Projects
No open projects
Status: Done
Development

Successfully merging this pull request may close these issues.

5 participants