Skip to content

Commit

Permalink
keep batch state in minibatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jul 27, 2024
1 parent 3d9f547 commit dda4416
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 41 deletions.
63 changes: 45 additions & 18 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type MinibatcherConfig struct {
MaxNumRetriesPerDispersal uint
}

type BatchState struct {
BatchID uuid.UUID
ReferenceBlockNumber uint
BlobHeaders []*core.BlobHeader
BlobMetadata []*disperser.BlobMetadata
OperatorState *core.IndexedOperatorState
NumMinibatches uint
}

type Minibatcher struct {
MinibatcherConfig

Expand All @@ -33,8 +42,9 @@ type Minibatcher struct {
Pool common.WorkerPool

// local state
Batches map[uuid.UUID]*BatchState
ReferenceBlockNumber uint
BatchID uuid.UUID
CurrentBatchID uuid.UUID
MinibatchIndex uint

ethClient common.EthClient
Expand All @@ -48,7 +58,6 @@ func NewMinibatcher(
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
// aggregator core.SignatureAggregator,
encodingStreamer *EncodingStreamer,
ethClient common.EthClient,
workerpool common.WorkerPool,
Expand All @@ -64,8 +73,9 @@ func NewMinibatcher(
EncodingStreamer: encodingStreamer,
Pool: workerpool,

Batches: make(map[uuid.UUID]*BatchState),
ReferenceBlockNumber: 0,
BatchID: uuid.Nil,
CurrentBatchID: uuid.Nil,
MinibatchIndex: 0,

ethClient: ethClient,
Expand Down Expand Up @@ -104,6 +114,15 @@ func (b *Minibatcher) Start(ctx context.Context) error {
return nil
}

func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState {
batchState, ok := b.Batches[batchID]
if !ok {
return nil
}
delete(b.Batches, batchID)
return batchState
}

func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
numPermanentFailures := 0
Expand Down Expand Up @@ -139,43 +158,51 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
return err
}
log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String())

// Processing new full batch
if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber {
// Update status of the previous batch
if b.BatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed)
if b.CurrentBatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.CurrentBatchID, BatchStatusFormed)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status"))
return fmt.Errorf("error updating batch status: %w", err)
}
}

// Create new batch
b.BatchID, err = uuid.NewV7()
// Reset local batch state and create new batch
b.CurrentBatchID, err = uuid.NewV7()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID"))
return fmt.Errorf("error generating batch ID: %w", err)
}
batchHeaderHash, err := batch.BatchHeader.GetBatchHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting batch header hash"))
return fmt.Errorf("error getting batch header hash: %w", err)
}
b.MinibatchIndex = 0
b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber
err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{
ID: b.BatchID,
ID: b.CurrentBatchID,
CreatedAt: time.Now().UTC(),
ReferenceBlockNumber: b.ReferenceBlockNumber,
HeaderHash: batchHeaderHash,
Status: BatchStatusPending,
})
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record"))
return fmt.Errorf("error storing batch record: %w", err)
}
b.Batches[b.CurrentBatchID] = &BatchState{
BatchID: b.CurrentBatchID,
ReferenceBlockNumber: b.ReferenceBlockNumber,
BlobHeaders: make([]*core.BlobHeader, 0),
BlobMetadata: make([]*disperser.BlobMetadata, 0),
OperatorState: batch.State,
NumMinibatches: 0,
}
}

// Accumulate batch metadata
batchState := b.Batches[b.CurrentBatchID]
batchState.BlobHeaders = append(batchState.BlobHeaders, batch.BlobHeaders...)
batchState.BlobMetadata = append(batchState.BlobMetadata, batch.BlobMetadata...)
batchState.NumMinibatches++

// Store minibatch record
blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs))
batchSize := int64(0)
Expand All @@ -189,7 +216,7 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
batchSize += blob.BlobHeader.EncodedSizeAllQuorums()
}
err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{
BatchID: b.BatchID,
BatchID: b.CurrentBatchID,
MinibatchIndex: b.MinibatchIndex,
BlobHeaderHashes: blobHeaderHashes,
BatchSize: uint64(batchSize),
Expand All @@ -201,9 +228,9 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
}

// Dispatch encoded batch
log.Debug("Dispatching encoded batch...", "batchID", b.BatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs))
log.Debug("Dispatching encoded batch...", "batchID", b.CurrentBatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs))
stageTimer = time.Now()
b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.BatchID, b.MinibatchIndex)
b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String())

h, err := batch.State.OperatorState.Hash()
Expand Down
140 changes: 117 additions & 23 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,47 +136,141 @@ func TestDisperseMinibatch(t *testing.T) {
_, _ = queueBlob(t, ctx, &blob1, c.blobStore)
_, _ = queueBlob(t, ctx, &blob2, c.blobStore)

// Start the batcher
out := make(chan batcher.EncodingResultOrStatus)
err := c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
encoded1 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded1)
assert.NoError(t, err)
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
encoded2 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded2)
assert.NoError(t, err)
count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)

err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.BatchID)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata})

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID)
// Second minibatch
blob3 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
}})
_, _ = queueBlob(t, ctx, &blob3, c.blobStore)
err = c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
encoded3 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded3)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(2))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(2))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 3)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata})
assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState)

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, c.minibatcher.BatchID, b.ID)
assert.NotNil(t, b.HeaderHash)
assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID)
assert.NotNil(t, b.CreatedAt)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.BatchID, mb.BatchID)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(0), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 2)
assert.Equal(t, uint64(12800), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)
mb, err = c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 1)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(1), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 1)
assert.Equal(t, uint64(7680), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)

// Create a new minibatch with increased reference block number
// Test that the previous batch is marked as formed and that the new batch is created with the correct reference block number
_, _ = queueBlob(t, ctx, &blob1, c.blobStore)
_, _ = queueBlob(t, ctx, &blob2, c.blobStore)

c.encodingStreamer.UpdateReferenecBlock(initialBlock + 10)

Check failure on line 216 in disperser/batcher/minibatcher_test.go

View workflow job for this annotation

GitHub Actions / Linter

Error return value of `c.encodingStreamer.UpdateReferenecBlock` is not checked (errcheck)
err = c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
encoded4 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded4)
assert.NoError(t, err)
encoded5 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded5)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)

// previous batch should be marked as formed
b, err = c.minibatchStore.GetBatch(ctx, b.ID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, b.Status, batcher.BatchStatusFormed)

// new batch should be created
assert.NotEqual(t, c.minibatcher.CurrentBatchID, b.ID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock+10)
assert.Len(t, c.minibatcher.Batches, 2)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1))
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock+10)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded4.BlobMetadata, encoded5.BlobMetadata})
assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState)

newBatch, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, newBatch)
assert.Equal(t, newBatch.ReferenceBlockNumber, initialBlock+10)
assert.Equal(t, newBatch.Status, batcher.BatchStatusPending)

// Test PopBatchState
batchState := c.minibatcher.PopBatchState(b.ID)
assert.NotNil(t, batchState)
assert.Equal(t, batchState.BatchID, b.ID)
assert.Equal(t, batchState.ReferenceBlockNumber, initialBlock)
assert.Equal(t, batchState.NumMinibatches, uint(2))
assert.Len(t, batchState.BlobHeaders, 3)
assert.ElementsMatch(t, batchState.BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata})
assert.NotNil(t, batchState.OperatorState)
assert.Len(t, c.minibatcher.Batches, 1)
assert.Nil(t, c.minibatcher.Batches[b.ID])

c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, b.ID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
for i, req := range dispersalRequests {
assert.Equal(t, req.BatchID, c.minibatcher.BatchID)
assert.Equal(t, req.BatchID, b.ID)
assert.Equal(t, req.MinibatchIndex, uint(0))
assert.Equal(t, req.NumBlobs, uint(2))
assert.NotNil(t, req.Socket)
Expand All @@ -185,11 +279,11 @@ func TestDisperseMinibatch(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, b.ID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
assert.Equal(t, resp.BatchID, c.minibatcher.BatchID)
assert.Equal(t, resp.BatchID, b.ID)
assert.Equal(t, resp.MinibatchIndex, uint(0))
assert.NotNil(t, resp.RespondedAt)
assert.NoError(t, resp.Error)
Expand Down Expand Up @@ -240,34 +334,34 @@ func TestDisperseMinibatchFailure(t *testing.T) {

err = c.minibatcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.BatchID)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock)

b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID)
b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID)
assert.NoError(t, err)
assert.NotNil(t, b)
assert.Equal(t, c.minibatcher.BatchID, b.ID)
assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID)
assert.NotNil(t, b.HeaderHash)
assert.NotNil(t, b.CreatedAt)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0)
mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.NotNil(t, mb)
assert.Equal(t, c.minibatcher.BatchID, mb.BatchID)
assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID)
assert.Equal(t, uint(0), mb.MinibatchIndex)
assert.Len(t, mb.BlobHeaderHashes, 2)
assert.Equal(t, uint64(12800), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
for i, req := range dispersalRequests {
assert.Equal(t, req.BatchID, c.minibatcher.BatchID)
assert.Equal(t, req.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, req.MinibatchIndex, uint(0))
assert.Equal(t, req.NumBlobs, uint(2))
assert.NotNil(t, req.Socket)
Expand All @@ -276,11 +370,11 @@ func TestDisperseMinibatchFailure(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.CurrentBatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
assert.Equal(t, resp.BatchID, c.minibatcher.BatchID)
assert.Equal(t, resp.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, resp.MinibatchIndex, uint(0))
assert.NotNil(t, resp.RespondedAt)
assert.NoError(t, resp.Error)
Expand Down

0 comments on commit dda4416

Please sign in to comment.