diff --git a/module/block_iterator.go b/module/block_iterator.go new file mode 100644 index 00000000000..b10b64f7451 --- /dev/null +++ b/module/block_iterator.go @@ -0,0 +1,121 @@ +package module + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// IterateRange defines the range of blocks to iterate over +// the range could be either view based range or height based range. +// when specifying the range, the start and end are inclusive, and the end must be greater than or +// equal to the start +type IterateRange struct { + Start uint64 // the start of the range + End uint64 // the end of the range +} + +// IterateRangeCreator is an interface for creating iterate jobs +type IteratorJobCreator interface { + // CreateJob takes a progress reader which is used to read the progress of the iterator + // and returns an iterate job that specifies the range of blocks to iterate over + CreateJob(IterateProgressReader) (IterateRange, error) +} + +// IterateProgressReader reads the progress of the iterator, useful for resuming the iteration +// after restart +type IterateProgressReader interface { + // LoadState reads the next block to iterate + // caller must ensure the reader is created by the IterateProgressInitializer, + // otherwise LoadState would return exception. + LoadState() (uint64, error) +} + +// IterateProgressWriter saves the progress of the iterator +type IterateProgressWriter interface { + // SaveState persists the next block to be iterated + SaveState(uint64) error +} + +// IterateProgressInitializer is an interface for initializing the progress of the iterator +// a initializer must be used to ensures the initial next block to be iterated is saved in +// storage before creating the block iterator +type IterateProgressInitializer interface { + Init() (IterateProgressReader, IterateProgressWriter, error) +} + +// BlockIterator is an interface for iterating over blocks +type BlockIterator interface { + // Next returns the next block in the iterator + // Note: this method is not concurrent-safe + // Note: a block will only be iterated once in a single iteration, however + // if the iteration is interrupted (e.g. by a restart), the iterator can be + // resumed from the last checkpoint, which might result in the same block being + // iterated again. + // TODO: once upgraded to go 1.23, consider using the Range iterator + // Range() iter.Seq2[flow.Identifier, error] + // so that the iterator can be used in a for loop: + // for blockID, err := range heightIterator.Range() + Next() (blockID flow.Identifier, hasNext bool, exception error) + + // Checkpoint saves the current state of the iterator + // so that it can be resumed later + // when Checkpoint is called, if SaveStateFunc is called with block A, + // then after restart, the iterator will resume from A. + // make sure to call this after all the jobs for processing the block IDs returned by + // Next() are completed. + Checkpoint() error +} + +// IteratorCreator is an interface for creating block iterators +type IteratorCreator interface { + // CreateIterator takes iterate job which specifies the range of blocks to iterate over + // and a progress writer which is used to save the progress of the iterator, + // and returns a block iterator that can be used to iterate over the blocks + // Note: it's up to the implementation to decide how often the progress is saved, + // it is wise to consider the trade-off between the performance and the progress saving, + // if the progress is saved too often, it might impact the iteration performance, however, + // if the progress is only saved at the end of the iteration, then if the iteration + // was interrupted, then the iterator will start from the beginning of the range again, + // which means some blocks might be iterated multiple times. + CreateIterator(IterateRange, IterateProgressWriter) (BlockIterator, error) +} + +type IteratorFactory struct { + progressReader IterateProgressReader + progressWriter IterateProgressWriter + creator IteratorCreator + jobCreator IteratorJobCreator +} + +func NewIteratorFactory( + initializer IterateProgressInitializer, + creator IteratorCreator, + jobCreator IteratorJobCreator, +) (*IteratorFactory, error) { + progressReader, progressWriter, err := initializer.Init() + if err != nil { + return nil, fmt.Errorf("failed to initialize progress: %w", err) + } + + return &IteratorFactory{ + progressReader: progressReader, + progressWriter: progressWriter, + creator: creator, + jobCreator: jobCreator, + }, nil +} + +func (f *IteratorFactory) Create() (BlockIterator, error) { + job, err := f.jobCreator.CreateJob(f.progressReader) + if err != nil { + return nil, fmt.Errorf("failed to create job for block iteration: %w", err) + } + + iterator, err := f.creator.CreateIterator(job, f.progressWriter) + if err != nil { + return nil, fmt.Errorf("failed to create block iterator: %w", err) + } + + return iterator, nil +} diff --git a/module/block_iterator/height_based/iterator.go b/module/block_iterator/height_based/iterator.go new file mode 100644 index 00000000000..9afb7bc0fc3 --- /dev/null +++ b/module/block_iterator/height_based/iterator.go @@ -0,0 +1,69 @@ +package height_based + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" +) + +// HeightIterator is a block iterator that iterates over blocks by height +// it's not concurrent safe, so don't use it in multiple goroutines +type HeightIterator struct { + // dependencies + getBlockIDByHeight func(uint64) (flow.Identifier, error) + progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration + + // config + endHeight uint64 + + // state + nextHeight uint64 +} + +var _ module.BlockIterator = (*HeightIterator)(nil) + +// caller must ensure that both job.Start and job.End are finalized height +func NewHeightIterator( + getBlockIDByHeight func(uint64) (flow.Identifier, error), + progress module.IterateProgressWriter, + job module.IterateRange, +) (module.BlockIterator, error) { + return &HeightIterator{ + getBlockIDByHeight: getBlockIDByHeight, + progress: progress, + endHeight: job.End, + nextHeight: job.Start, + }, nil +} + +// Next returns the next block ID in the iteration +// it iterates from lower height to higher height. +// when iterating a height, it iterates over all sibling blocks at that height +// Note: this method is not concurrent-safe +func (b *HeightIterator) Next() (flow.Identifier, bool, error) { + if b.nextHeight > b.endHeight { + return flow.ZeroID, false, nil + } + + // TODO: use storage operation instead to avoid hitting cache + next, err := b.getBlockIDByHeight(b.nextHeight) + if err != nil { + return flow.ZeroID, false, fmt.Errorf("failed to fetch block at height %v: %w", b.nextHeight, err) + } + + b.nextHeight++ + + return next, true, nil +} + +// Checkpoint saves the iteration progress to storage +// make sure to call this after all the jobs for processing the block IDs returned by +// Next() are completed. +func (b *HeightIterator) Checkpoint() error { + err := b.progress.SaveState(b.nextHeight) + if err != nil { + return fmt.Errorf("failed to save progress at view %v: %w", b.nextHeight, err) + } + return nil +} diff --git a/module/block_iterator/height_based/iterator_test.go b/module/block_iterator/height_based/iterator_test.go new file mode 100644 index 00000000000..362c4807c48 --- /dev/null +++ b/module/block_iterator/height_based/iterator_test.go @@ -0,0 +1,91 @@ +package height_based + +import ( + "fmt" + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + storagebadger "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestIterateHeight(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + // create blocks with siblings + b1 := &flow.Header{Height: 1} + b2 := &flow.Header{Height: 2} + b3 := &flow.Header{Height: 3} + bs := []*flow.Header{b1, b2, b3} + + // index height + for _, b := range bs { + require.NoError(t, db.Update(operation.IndexBlockHeight(b.Height, b.ID()))) + } + + progress := &saveNextHeight{} + + // create iterator + // b0 is the root block, iterate from b1 to b3 + job := module.IterateRange{Start: b1.Height, End: b3.Height} + headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db) + iter, err := NewHeightIterator(headers.BlockIDByHeight, progress, job) + require.NoError(t, err) + + // iterate through all blocks + visited := make(map[flow.Identifier]struct{}) + for { + id, ok, err := iter.Next() + require.NoError(t, err) + if !ok { + break + } + + // preventing duplicate visit + _, ok = visited[id] + require.False(t, ok, fmt.Sprintf("block %v is visited twice", id)) + + visited[id] = struct{}{} + } + + // verify all blocks are visited + for _, b := range bs { + _, ok := visited[b.ID()] + require.True(t, ok, fmt.Sprintf("block %v is not visited", b.ID())) + delete(visited, b.ID()) + } + require.Empty(t, visited) + + // save the next to iterate height and verify + + require.NoError(t, iter.Checkpoint()) + + savedNextHeight, err := progress.LoadState() + require.NoError(t, err) + + require.Equal(t, b3.Height+1, savedNextHeight, + fmt.Sprintf("saved next height should be %v, but got %v", b3.Height, savedNextHeight)) + + }) +} + +type saveNextHeight struct { + savedNextHeight uint64 +} + +var _ module.IterateProgressWriter = (*saveNextHeight)(nil) +var _ module.IterateProgressReader = (*saveNextHeight)(nil) + +func (s *saveNextHeight) SaveState(height uint64) error { + s.savedNextHeight = height + return nil +} + +func (s *saveNextHeight) LoadState() (uint64, error) { + return s.savedNextHeight, nil +}