Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep batch state in Minibatcher #668

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type MinibatcherConfig struct {
MaxNumRetriesPerDispersal uint
}

type BatchState struct {
BatchID uuid.UUID
ReferenceBlockNumber uint
BlobHeaders []*core.BlobHeader
BlobMetadata []*disperser.BlobMetadata
OperatorState *core.IndexedOperatorState
NumMinibatches uint
}

type Minibatcher struct {
MinibatcherConfig

Expand All @@ -33,8 +42,9 @@ type Minibatcher struct {
Pool common.WorkerPool

// local state
Batches map[uuid.UUID]*BatchState
ReferenceBlockNumber uint
BatchID uuid.UUID
CurrentBatchID uuid.UUID
MinibatchIndex uint

ethClient common.EthClient
Expand All @@ -48,7 +58,6 @@ func NewMinibatcher(
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
// aggregator core.SignatureAggregator,
encodingStreamer *EncodingStreamer,
ethClient common.EthClient,
workerpool common.WorkerPool,
Expand All @@ -64,8 +73,9 @@ func NewMinibatcher(
EncodingStreamer: encodingStreamer,
Pool: workerpool,

Batches: make(map[uuid.UUID]*BatchState),
ReferenceBlockNumber: 0,
BatchID: uuid.Nil,
CurrentBatchID: uuid.Nil,
MinibatchIndex: 0,

ethClient: ethClient,
Expand Down Expand Up @@ -104,6 +114,15 @@ func (b *Minibatcher) Start(ctx context.Context) error {
return nil
}

func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState {
batchState, ok := b.Batches[batchID]
if !ok {
return nil
}
delete(b.Batches, batchID)
return batchState
}

func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
numPermanentFailures := 0
Expand Down Expand Up @@ -139,43 +158,51 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
return err
}
log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String())

// Processing new full batch
if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber {
// Update status of the previous batch
if b.BatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed)
if b.CurrentBatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.CurrentBatchID, BatchStatusFormed)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status"))
return fmt.Errorf("error updating batch status: %w", err)
}
}

// Create new batch
b.BatchID, err = uuid.NewV7()
// Reset local batch state and create new batch
b.CurrentBatchID, err = uuid.NewV7()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID"))
return fmt.Errorf("error generating batch ID: %w", err)
}
batchHeaderHash, err := batch.BatchHeader.GetBatchHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting batch header hash"))
return fmt.Errorf("error getting batch header hash: %w", err)
}
b.MinibatchIndex = 0
b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber
err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{
ID: b.BatchID,
ID: b.CurrentBatchID,
CreatedAt: time.Now().UTC(),
ReferenceBlockNumber: b.ReferenceBlockNumber,
HeaderHash: batchHeaderHash,
Status: BatchStatusPending,
})
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record"))
return fmt.Errorf("error storing batch record: %w", err)
}
b.Batches[b.CurrentBatchID] = &BatchState{
BatchID: b.CurrentBatchID,
ReferenceBlockNumber: b.ReferenceBlockNumber,
BlobHeaders: make([]*core.BlobHeader, 0),
BlobMetadata: make([]*disperser.BlobMetadata, 0),
OperatorState: batch.State,
NumMinibatches: 0,
}
}

// Accumulate batch metadata
batchState := b.Batches[b.CurrentBatchID]
batchState.BlobHeaders = append(batchState.BlobHeaders, batch.BlobHeaders...)
batchState.BlobMetadata = append(batchState.BlobMetadata, batch.BlobMetadata...)
batchState.NumMinibatches++

// Store minibatch record
blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs))
batchSize := int64(0)
Expand All @@ -189,7 +216,7 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
batchSize += blob.BlobHeader.EncodedSizeAllQuorums()
}
err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{
BatchID: b.BatchID,
BatchID: b.CurrentBatchID,
MinibatchIndex: b.MinibatchIndex,
BlobHeaderHashes: blobHeaderHashes,
BatchSize: uint64(batchSize),
Expand All @@ -201,9 +228,9 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
}

// Dispatch encoded batch
log.Debug("Dispatching encoded batch...", "batchID", b.BatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs))
log.Debug("Dispatching encoded batch...", "batchID", b.CurrentBatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs))
stageTimer = time.Now()
b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.BatchID, b.MinibatchIndex)
b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String())

h, err := batch.State.OperatorState.Hash()
Expand Down
141 changes: 118 additions & 23 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,47 +136,142 @@ func TestDisperseMinibatch(t *testing.T) {
_, _ = queueBlob(t, ctx, &blob1, c.blobStore)
_, _ = queueBlob(t, ctx, &blob2, c.blobStore)

// Start the batcher
out := make(chan batcher.EncodingResultOrStatus)
err := c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
encoded1 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded1)
assert.NoError(t, err)
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
encoded2 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded2)
assert.NoError(t, err)
count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)

err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.BatchID)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata})

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID)
// Second minibatch
blob3 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
}})
_, _ = queueBlob(t, ctx, &blob3, c.blobStore)
err = c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
encoded3 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded3)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(2))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(2))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 3)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata})
assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState)

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, c.minibatcher.BatchID, b.ID)
assert.NotNil(t, b.HeaderHash)
assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID)
assert.NotNil(t, b.CreatedAt)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.BatchID, mb.BatchID)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(0), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 2)
assert.Equal(t, uint64(12800), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)
mb, err = c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 1)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(1), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 1)
assert.Equal(t, uint64(7680), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)

// Create a new minibatch with increased reference block number
// Test that the previous batch is marked as formed and that the new batch is created with the correct reference block number
_, _ = queueBlob(t, ctx, &blob1, c.blobStore)
_, _ = queueBlob(t, ctx, &blob2, c.blobStore)

err = c.encodingStreamer.UpdateReferenecBlock(initialBlock + 10)
assert.NoError(t, err)
err = c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
encoded4 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded4)
assert.NoError(t, err)
encoded5 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded5)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)

// previous batch should be marked as formed
b, err = c.minibatchStore.GetBatch(ctx, b.ID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, b.Status, batcher.BatchStatusFormed)

// new batch should be created
assert.NotEqual(t, c.minibatcher.CurrentBatchID, b.ID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock+10)
assert.Len(t, c.minibatcher.Batches, 2)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock+10)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded4.BlobMetadata, encoded5.BlobMetadata})
assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState)

newBatch, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, newBatch)
assert.Equal(t, newBatch.ReferenceBlockNumber, initialBlock+10)
assert.Equal(t, newBatch.Status, batcher.BatchStatusPending)

// Test PopBatchState
batchState := c.minibatcher.PopBatchState(b.ID)
assert.NotNil(t, batchState)
assert.Equal(t, batchState.BatchID, b.ID)
assert.Equal(t, batchState.ReferenceBlockNumber, initialBlock)
assert.Equal(t, batchState.NumMinibatches, uint(2))
assert.Len(t, batchState.BlobHeaders, 3)
assert.ElementsMatch(t, batchState.BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata})
assert.NotNil(t, batchState.OperatorState)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Nil(t, c.minibatcher.Batches[b.ID])

c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, b.ID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
for i, req := range dispersalRequests {
assert.Equal(t, req.BatchID, c.minibatcher.BatchID)
assert.Equal(t, req.BatchID, b.ID)
assert.Equal(t, req.MinibatchIndex, uint(0))
assert.Equal(t, req.NumBlobs, uint(2))
assert.NotNil(t, req.Socket)
Expand All @@ -185,11 +280,11 @@ func TestDisperseMinibatch(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, b.ID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
assert.Equal(t, resp.BatchID, c.minibatcher.BatchID)
assert.Equal(t, resp.BatchID, b.ID)
assert.Equal(t, resp.MinibatchIndex, uint(0))
assert.NotNil(t, resp.RespondedAt)
assert.NoError(t, resp.Error)
Expand Down Expand Up @@ -240,34 +335,34 @@ func TestDisperseMinibatchFailure(t *testing.T) {

err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.BatchID)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID)
b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, c.minibatcher.BatchID, b.ID)
assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID)
assert.NotNil(t, b.HeaderHash)
assert.NotNil(t, b.CreatedAt)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.BatchID, mb.BatchID)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(0), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 2)
assert.Equal(t, uint64(12800), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
for i, req := range dispersalRequests {
assert.Equal(t, req.BatchID, c.minibatcher.BatchID)
assert.Equal(t, req.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, req.MinibatchIndex, uint(0))
assert.Equal(t, req.NumBlobs, uint(2))
assert.NotNil(t, req.Socket)
Expand All @@ -276,11 +371,11 @@ func TestDisperseMinibatchFailure(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
assert.Equal(t, resp.BatchID, c.minibatcher.BatchID)
assert.Equal(t, resp.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, resp.MinibatchIndex, uint(0))
assert.NotNil(t, resp.RespondedAt)
assert.NoError(t, resp.Error)
Expand Down
Loading