Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chunk Data Pack Pruner] Add Block Iterator #6858

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
114 changes: 114 additions & 0 deletions module/block_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package module

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// IterateJob defines the range of blocks to iterate over
// the range could be either view based range or height based range.
// when specifying the range, the start and end are inclusive, and the end must be greater than or
// equal to the start
type IterateJob struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion:

Suggested change
type IterateJob struct {
type IterationRange struct {

Start uint64 // the start of the range
End uint64 // the end of the range
}

// IterateJobCreator is an interface for creating iterate jobs
type IteratorJobCreator interface {
// CreateJob takes a progress reader which is used to read the progress of the iterator
// and returns an iterate job that specifies the range of blocks to iterate over
CreateJob(IterateProgressReader) (IterateJob, error)
}

// IterateProgressReader reads the progress of the iterator, useful for resuming the iteration
// after restart
type IterateProgressReader interface {
// ReadNext reads the next block to iterate
// caller must ensure the reader is created by the IterateProgressInitializer,
// otherwise ReadNext would return exception.
ReadNext() (uint64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ReadNext() (uint64, error)
LoadState() (uint64, error)

}

// IterateProgressWriter saves the progress of the iterator
type IterateProgressWriter interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a bit overkill to have separate interfaces for read and save in this case, since you always need both for iterating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see this PR , how the writer and reader are separated.

The reader is used by the Job creator to read the start height and creating a height range. And it doesn't need the writer to update progress.

The writer is used by the iterator for saving the iterated height. Since the iteration range is decided by the input (IteratorJob), the iterator doesn't need the reader to read progress from storage.

// SaveNext persists the next block to be iterated
SaveNext(uint64) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SaveNext(uint64) error
SaveState(uint64) error

}

// IterateProgressInitializer is an interface for initializing the progress of the iterator
// a initializer must be used to ensures the initial next block to be iterated is saved in
// storage before creating the block iterator
type IterateProgressInitializer interface {
Init() (IterateProgressReader, IterateProgressWriter, error)
}

// BlockIterator is an interface for iterating over blocks
type BlockIterator interface {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BlockIterator interface can be implemented into height based iterator and view based iterator.

The block iterator not long can be used by chunk data pack pruner, but alsoin future to implement protocol state pruner.

The height based iterator is easy to implement, however, it can't guarantee to prune all data, since it doesn't iterate unfinalized blocks. The view based iterator can guarantee all blocks are pruned, but it's more complicated to implement.

In this PR, I first implement the height based iterator, for chunk data pack, it's OK that we only prune by height, however, for protocol state, it's better that we can prune by view and ensure a more throughout pruning.

// Next returns the next block in the iterator
// Note: a block will only be iterated once in a single iteration, however
// if the iteration is interrupted (e.g. by a restart), the iterator can be
// resumed from the last checkpoint, which might result in the same block being
// iterated again.
Next() (blockID flow.Identifier, hasNext bool, exception error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about making this a go iterator? Instead of Next() it could be

func (b *HeightIterator) Range() iter.Seq2[flow.Identifier, error] {
	return func(yield func(flow.Identifier, error) bool) {
		for b.nextHeight <= b.endHeight {
			next, err := b.headers.BlockIDByHeight(b.nextHeight)
			if err != nil {
				yield(flow.ZeroID, fmt.Errorf("failed to fetch block at height %v: %w", b.nextHeight, err))
				return
			}
		
			b.nextHeight++
		
			if !yield(next, nil) {
				return
			}
		}
	}
}

then

for blockID, err := range heightIterator.Range() {
	...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a go 1.23 feature right? 1.22 seems doesn't have it yet.

Thanks, I will add a TODO once we upgrade to 1.23


// Checkpoint saves the current state of the iterator
// so that it can be resumed later
// when Checkpoint is called, if SaveNextFunc is called with block A,
// then after restart, the iterator will resume from A.
Checkpoint() error
}

// IteratorCreator is an interface for creating block iterators
type IteratorCreator interface {
// CreateIterator takes iterate job which specifies the range of blocks to iterate over
// and a progress writer which is used to save the progress of the iterator,
// and returns a block iterator that can be used to iterate over the blocks
// Note: it's up to the implementation to decide how often the progress is saved,
// it is wise to consider the trade-off between the performance and the progress saving,
// if the progress is saved too often, it might impact the iteration performance, however,
// if the progress is only saved at the end of the iteration, then if the iteration
// was interrupted, then the iterator will start from the beginning of the range again,
// which means some blocks might be iterated multiple times.
CreateIterator(IterateJob, IterateProgressWriter) (BlockIterator, error)
}

type IteratorFactory struct {
progressReader IterateProgressReader
progressWriter IterateProgressWriter
creator IteratorCreator
jobCreator IteratorJobCreator
}

func NewIteratorFactory(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the interfaces in the arguments are implemented, then the logic to create the BlockIterator can be reused. That's why, I put this function here along with the interface definitions, so that it's clear to see how the interfaces will be used for creating the block iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just call this function NewBlockIterator and return a BlockIterator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be two NewBlockIterator implementations: NewHeightBasedBlockIterator and NewViewBasedBlockIterator, both of them will need to implement progress initialization and creating job with range of height / view. And these logic are the same for both, so extracting the iteration factory is to reuse them.

In other words, there will be one IterationFactory, many different BlockIterator creators.

initializer IterateProgressInitializer,
creator IteratorCreator,
jobCreator IteratorJobCreator,
) (*IteratorFactory, error) {
progressReader, progressWriter, err := initializer.Init()
if err != nil {
return nil, fmt.Errorf("failed to initialize progress: %w", err)
}

return &IteratorFactory{
progressReader: progressReader,
progressWriter: progressWriter,
creator: creator,
jobCreator: jobCreator,
}, nil
}

func (f *IteratorFactory) Create() (BlockIterator, error) {
job, err := f.jobCreator.CreateJob(f.progressReader)
if err != nil {
return nil, fmt.Errorf("failed to create job for block iteration: %w", err)
}

iterator, err := f.creator.CreateIterator(job, f.progressWriter)
if err != nil {
return nil, fmt.Errorf("failed to create block iterator: %w", err)
}

return iterator, nil
}
76 changes: 76 additions & 0 deletions module/block_iterator/height_based/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package height_based

import (
"context"
"fmt"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
)

type HeightIterator struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not concurrency safe. should it be? if not, can you add a warning to the godoc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, suppose we should have only one iterator for a task.

// dependencies
headers storage.Headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the BlockIDByHeight function is needed from headers. Consider just using a func (height) flow.Identifier

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Actually if I change it into a GetBlockIDByHeight function, the whole HeightIterator will be almost identical to ViewIterator, meaning we could just use HeightIterator as ViewIterator by passing a GetBlockIDByView function as GetBlockIDByHeight.

progress module.IterateProgressWriter // for saving the next height to be iterated for resuming the iteration

// config
endHeight uint64
ctx context.Context

// state
nextHeight uint64
}

var _ module.BlockIterator = (*HeightIterator)(nil)

// caller must ensure that both job.Start and job.End are finalized height
func NewHeightIterator(
headers storage.Headers,
progress module.IterateProgressWriter,
ctx context.Context,
job module.IterateJob,
) (module.BlockIterator, error) {
return &HeightIterator{
headers: headers,
progress: progress,
endHeight: job.End,
ctx: ctx,
nextHeight: job.Start,
}, nil
}

// Next returns the next block ID in the iteration
// it iterates from lower height to higher height.
// when iterating a height, it iterates over all sibling blocks at that height
func (b *HeightIterator) Next() (flow.Identifier, bool, error) {
// exit when the context is done
select {
case <-b.ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need context here? since it's only used for this check and only at the beginning of the function call, it seems like we should make the check the caller's responsibility

return flow.ZeroID, false, nil
default:
}

if b.nextHeight > b.endHeight {
return flow.ZeroID, false, nil
}

// TODO: use storage operation instead to avoid hitting cache
next, err := b.headers.BlockIDByHeight(b.nextHeight)
if err != nil {
return flow.ZeroID, false, fmt.Errorf("failed to fetch block at height %v: %w", b.nextHeight, err)
}

b.nextHeight++

return next, true, nil
}

// Checkpoint saves the iteration progress to storage
func (b *HeightIterator) Checkpoint() error {
err := b.progress.SaveNext(b.nextHeight)
if err != nil {
return fmt.Errorf("failed to save progress at view %v: %w", b.nextHeight, err)
}
return nil
}
94 changes: 94 additions & 0 deletions module/block_iterator/height_based/iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package height_based

import (
"context"
"fmt"
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
storagebadger "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/utils/unittest"
)

func TestIterateHeight(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
// create blocks with siblings
b1 := &flow.Header{Height: 1}
b2 := &flow.Header{Height: 2}
b3 := &flow.Header{Height: 3}
bs := []*flow.Header{b1, b2, b3}

// index height
for _, b := range bs {
require.NoError(t, db.Update(operation.IndexBlockHeight(b.Height, b.ID())))
}

progress := &saveNextHeight{}

// create iterator
// b0 is the root block, iterate from b1 to b3
job := module.IterateJob{Start: b1.Height, End: b3.Height}
headers := storagebadger.NewHeaders(&metrics.NoopCollector{}, db)
iter, err := NewHeightIterator(headers, progress, context.Background(), job)
require.NoError(t, err)

// iterate through all blocks
visited := make(map[flow.Identifier]struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this a slice instead of a map so the verification step can check that they were also visited in the correct order?

count := 0
for {
id, ok, err := iter.Next()
require.NoError(t, err)
if !ok {
break
}
visited[id] = struct{}{}

// verify we don't iterate two many blocks
count++
if count > len(bs) {
t.Fatal("visited too many blocks")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use a slice, you can omit this and just compare the final length to len(bs)

}

// verify all blocks are visited
for _, b := range bs {
_, ok := visited[b.ID()]
require.True(t, ok, fmt.Sprintf("block %v is not visited", b.ID()))
delete(visited, b.ID())
}
require.Empty(t, visited)

// save the next to iterate height and verify

require.NoError(t, iter.Checkpoint())

savedNextHeight, err := progress.ReadNext()
require.NoError(t, err)

require.Equal(t, b3.Height+1, savedNextHeight,
fmt.Sprintf("saved next height should be %v, but got %v", b3.Height, savedNextHeight))

})
}

type saveNextHeight struct {
savedNextHeight uint64
}

var _ module.IterateProgressWriter = (*saveNextHeight)(nil)
var _ module.IterateProgressReader = (*saveNextHeight)(nil)

func (s *saveNextHeight) SaveNext(height uint64) error {
s.savedNextHeight = height
return nil
}

func (s *saveNextHeight) ReadNext() (uint64, error) {
return s.savedNextHeight, nil
}
Loading