Skip to content

Commit

Permalink
Create blob minibatch mappings (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Aug 13, 2024
1 parent 93d776f commit 8ff9fe1
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 35 deletions.
106 changes: 76 additions & 30 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,26 @@ func (b *Minibatcher) Start(ctx context.Context) error {
go func() {
ticker := time.NewTicker(b.PullInterval)
defer ticker.Stop()

cancelFuncs := make([]context.CancelFunc, 0)
for {
select {
case <-ctx.Done():
for _, cancel := range cancelFuncs {
cancel()
}
return
case <-ticker.C:
if err := b.HandleSingleBatch(ctx); err != nil {
cancel, err := b.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to make a batch with")
} else {
b.logger.Error("failed to process a batch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
}
}
}()
Expand Down Expand Up @@ -145,95 +152,103 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper
return result.ErrorOrNil()
}

func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.CancelFunc, error) {
log := b.logger
// If too many dispersal requests are pending, skip an iteration
if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) {
return fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections)
return nil, fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections)
}
stageTimer := time.Now()
// All blobs in this batch are marked as DISPERSING
batch, err := b.EncodingStreamer.CreateMinibatch(ctx)
minibatch, err := b.EncodingStreamer.CreateMinibatch(ctx)
if err != nil {
return err
return nil, err
}
log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String())
// Processing new full batch
if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber {
if b.ReferenceBlockNumber < minibatch.BatchHeader.ReferenceBlockNumber {
// Update status of the previous batch
if b.CurrentBatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.CurrentBatchID, BatchStatusFormed)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status"))
return fmt.Errorf("error updating batch status: %w", err)
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error updating batch status"))
return nil, fmt.Errorf("error updating batch status: %w", err)
}
}

// Reset local batch state and create new batch
b.CurrentBatchID, err = uuid.NewV7()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID"))
return fmt.Errorf("error generating batch ID: %w", err)
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error generating batch UUID"))
return nil, fmt.Errorf("error generating batch ID: %w", err)
}
b.MinibatchIndex = 0
b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber
b.ReferenceBlockNumber = minibatch.BatchHeader.ReferenceBlockNumber
err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{
ID: b.CurrentBatchID,
CreatedAt: time.Now().UTC(),
ReferenceBlockNumber: b.ReferenceBlockNumber,
Status: BatchStatusPending,
})
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record"))
return fmt.Errorf("error storing batch record: %w", err)
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error storing batch record"))
return nil, fmt.Errorf("error storing batch record: %w", err)
}
b.Batches[b.CurrentBatchID] = &BatchState{
BatchID: b.CurrentBatchID,
ReferenceBlockNumber: b.ReferenceBlockNumber,
BlobHeaders: make([]*core.BlobHeader, 0),
BlobMetadata: make([]*disperser.BlobMetadata, 0),
OperatorState: batch.State,
OperatorState: minibatch.State,
NumMinibatches: 0,
}
}

// Accumulate batch metadata
batchState := b.Batches[b.CurrentBatchID]
batchState.BlobHeaders = append(batchState.BlobHeaders, batch.BlobHeaders...)
batchState.BlobMetadata = append(batchState.BlobMetadata, batch.BlobMetadata...)
batchState.BlobHeaders = append(batchState.BlobHeaders, minibatch.BlobHeaders...)
batchState.BlobMetadata = append(batchState.BlobMetadata, minibatch.BlobMetadata...)
batchState.NumMinibatches++

// Store minibatch record
blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs))
blobHeaderHashes := make([][32]byte, 0, len(minibatch.EncodedBlobs))
batchSize := int64(0)
for _, blob := range batch.EncodedBlobs {
for _, blob := range minibatch.EncodedBlobs {
h, err := blob.BlobHeader.GetBlobHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting blob header hash"))
return fmt.Errorf("error getting blob header hash: %w", err)
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error getting blob header hash"))
return nil, fmt.Errorf("error getting blob header hash: %w", err)
}
blobHeaderHashes = append(blobHeaderHashes, h)
batchSize += blob.BlobHeader.EncodedSizeAllQuorums()
}
err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{
minibatchRecord := &MinibatchRecord{
BatchID: b.CurrentBatchID,
MinibatchIndex: b.MinibatchIndex,
BlobHeaderHashes: blobHeaderHashes,
BatchSize: uint64(batchSize),
ReferenceBlockNumber: b.ReferenceBlockNumber,
})
}
err = b.MinibatchStore.PutMinibatch(ctx, minibatchRecord)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing minibatch record"))
return fmt.Errorf("error storing minibatch record: %w", err)
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error storing minibatch record"))
return nil, fmt.Errorf("error storing minibatch record: %w", err)
}

// Dispatch encoded batch
log.Debug("Dispatching encoded batch...", "batchID", b.CurrentBatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs))
log.Debug("Dispatching encoded batch...", "batchID", b.CurrentBatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(minibatch.EncodedBlobs))
stageTimer = time.Now()
b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
dispersalCtx, cancelDispersal := context.WithCancel(ctx)
storeMappingsChan := make(chan error)
// Store the blob minibatch mappings in parallel
go func() {
err := b.createBlobMinibatchMappings(ctx, minibatchRecord, minibatch.BlobMetadata, minibatch.BlobHeaders)
storeMappingsChan <- err
}()
b.DisperseBatch(dispersalCtx, minibatch.State, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String())

h, err := batch.State.OperatorState.Hash()
h, err := minibatch.State.OperatorState.Hash()
if err != nil {
log.Error("error getting operator state hash", "err", err)
}
Expand All @@ -245,7 +260,14 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {

b.MinibatchIndex++

return nil
// Wait for the blob minibatch mappings to be stored then cancel the dispersal process if there was an error
storeMappingsErr := <-storeMappingsChan
if storeMappingsErr != nil {
cancelDispersal()
return nil, fmt.Errorf("error storing blob minibatch mappings: %w", storeMappingsErr)
}

return cancelDispersal, nil
}

func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) {
Expand Down Expand Up @@ -317,8 +339,8 @@ func (b *Minibatcher) SendBlobsToOperatorWithRetries(
for numRetries < maxNumRetries {
requestedAt := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
signatures, err = b.Dispatcher.SendBlobsToOperator(ctxWithTimeout, blobMessages, batchHeader, op)
cancel()
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
b.logger.Error("error sending chunks to operator", "operator", opID.Hex(), "err", err, "timeout", timeout.String(), "numRetries", numRetries, "maxNumRetries", maxNumRetries)
Expand All @@ -336,3 +358,27 @@ func (b *Minibatcher) SendBlobsToOperatorWithRetries(

return signatures, nil
}

// createBlobMinibatchMappings creates a mapping between blob metadata and blob headers
// and stores it in the minibatch store. It assumes that the blob metadata and blob headers
// are ordered by blob index.
func (b *Minibatcher) createBlobMinibatchMappings(ctx context.Context, minibatch *MinibatchRecord, blobMetadatas []*disperser.BlobMetadata, blobHeaders []*core.BlobHeader) error {
if len(blobMetadatas) != len(blobHeaders) {
return fmt.Errorf("number of blob metadatas and blob headers do not match")
}

mappings := make([]*BlobMinibatchMapping, len(blobMetadatas))
for i, blobMetadata := range blobMetadatas {
blobKey := blobMetadata.GetBlobKey()
blobHeader := blobHeaders[i]
mappings[i] = &BlobMinibatchMapping{
BlobKey: &blobKey,
BatchID: minibatch.BatchID,
MinibatchIndex: minibatch.MinibatchIndex,
BlobIndex: uint(i),
BlobCommitments: blobHeader.BlobCommitments,
BlobQuorumInfos: blobHeader.QuorumInfos,
}
}
return b.MinibatchStore.PutBlobMinibatchMappings(ctx, mappings)
}
116 changes: 111 additions & 5 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDisperseMinibatch(t *testing.T) {
count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)

err = c.minibatcher.HandleSingleBatch(ctx)
_, err = c.minibatcher.HandleSingleMinibatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
Expand All @@ -162,6 +162,61 @@ func TestDisperseMinibatch(t *testing.T) {
assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock)
assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2)
assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata})
blobKey1 := encoded1.BlobMetadata.GetBlobKey()
blobMinibatchMappings, err := c.minibatchStore.GetBlobMinibatchMappings(ctx, blobKey1)
assert.NoError(t, err)
assert.Len(t, blobMinibatchMappings, 1)
mapping1 := blobMinibatchMappings[0]
assert.Equal(t, mapping1.BlobKey, &blobKey1)
assert.Equal(t, mapping1.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, mapping1.MinibatchIndex, uint(0))
assert.Equal(t, mapping1.BlobQuorumInfos, []*core.BlobQuorumInfo{encoded1.BlobQuorumInfo})
serializedCommitment1, err := encoded1.Commitment.Commitment.Serialize()
assert.NoError(t, err)
expectedCommitment1, err := mapping1.Commitment.Serialize()
assert.NoError(t, err)
serializedLengthCommitment1, err := encoded1.Commitment.LengthCommitment.Serialize()
assert.NoError(t, err)
expectedLengthCommitment1, err := mapping1.LengthCommitment.Serialize()
assert.NoError(t, err)
serializedLengthProof1, err := encoded1.Commitment.LengthProof.Serialize()
assert.NoError(t, err)
expectedLengthProof1, err := mapping1.LengthProof.Serialize()
assert.NoError(t, err)
assert.Equal(t, serializedCommitment1, expectedCommitment1)
assert.Equal(t, serializedLengthCommitment1, expectedLengthCommitment1)
assert.Equal(t, serializedLengthProof1, expectedLengthProof1)
blobKey2 := encoded1.BlobMetadata.GetBlobKey()
blobMinibatchMappings, err = c.minibatchStore.GetBlobMinibatchMappings(ctx, blobKey2)
assert.NoError(t, err)
assert.Len(t, blobMinibatchMappings, 1)
mapping2 := blobMinibatchMappings[0]
assert.Equal(t, mapping2.BlobKey, &blobKey2)
assert.Equal(t, mapping2.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, mapping2.MinibatchIndex, uint(0))
assert.Equal(t, mapping2.BlobQuorumInfos, []*core.BlobQuorumInfo{encoded1.BlobQuorumInfo})
if mapping1.BlobIndex != 0 {
assert.Equal(t, mapping2.BlobIndex, uint(1))
} else if mapping1.BlobIndex != 1 {
assert.Equal(t, mapping2.BlobIndex, uint(0))
} else {
t.Fatal("invalid blob index")
}
serializedCommitment2, err := encoded2.Commitment.Commitment.Serialize()
assert.NoError(t, err)
expectedCommitment2, err := mapping2.Commitment.Serialize()
assert.NoError(t, err)
serializedLengthCommitment2, err := encoded2.Commitment.LengthCommitment.Serialize()
assert.NoError(t, err)
expectedLengthCommitment2, err := mapping2.LengthCommitment.Serialize()
assert.NoError(t, err)
serializedLengthProof2, err := encoded2.Commitment.LengthProof.Serialize()
assert.NoError(t, err)
expectedLengthProof2, err := mapping2.LengthProof.Serialize()
assert.NoError(t, err)
assert.Equal(t, serializedCommitment2, expectedCommitment2)
assert.Equal(t, serializedLengthCommitment2, expectedLengthCommitment2)
assert.Equal(t, serializedLengthProof2, expectedLengthProof2)

// Second minibatch
blob3 := makeTestBlob([]*core.SecurityParam{{
Expand All @@ -175,7 +230,7 @@ func TestDisperseMinibatch(t *testing.T) {
encoded3 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded3)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
_, err = c.minibatcher.HandleSingleMinibatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(2))
Expand Down Expand Up @@ -211,6 +266,16 @@ func TestDisperseMinibatch(t *testing.T) {
assert.Equal(t, uint64(7680), mb.BatchSize)
assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber)

blobKey3 := encoded3.BlobMetadata.GetBlobKey()
blobMinibatchMappings, err = c.minibatchStore.GetBlobMinibatchMappings(ctx, blobKey3)
assert.NoError(t, err)
assert.Len(t, blobMinibatchMappings, 1)
mapping3 := blobMinibatchMappings[0]
assert.Equal(t, mapping3.BlobKey, &blobKey3)
assert.Equal(t, mapping3.BatchID, c.minibatcher.CurrentBatchID)
assert.Equal(t, mapping3.MinibatchIndex, uint(1))
assert.Equal(t, mapping3.BlobIndex, uint(0))

// Create a new minibatch with increased reference block number
// Test that the previous batch is marked as formed and that the new batch is created with the correct reference block number
_, _ = queueBlob(t, ctx, &blob1, c.blobStore)
Expand All @@ -226,7 +291,7 @@ func TestDisperseMinibatch(t *testing.T) {
encoded5 := <-out
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded5)
assert.NoError(t, err)
err = c.minibatcher.HandleSingleBatch(ctx)
_, err = c.minibatcher.HandleSingleMinibatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)

Expand Down Expand Up @@ -336,7 +401,7 @@ func TestDisperseMinibatchFailure(t *testing.T) {
count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)

err = c.minibatcher.HandleSingleBatch(ctx)
_, err = c.minibatcher.HandleSingleMinibatch(ctx)
assert.NoError(t, err)
assert.NotNil(t, c.minibatcher.CurrentBatchID)
assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1))
Expand Down Expand Up @@ -442,6 +507,47 @@ func TestSendBlobsToOperatorWithRetries(t *testing.T) {
assert.Nil(t, signatures)
}

func TestSendBlobsToOperatorWithRetriesCanceled(t *testing.T) {
c := newMinibatcher(t, defaultConfig)
ctx := context.Background()

blob := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
}})
_, _ = queueBlob(t, ctx, &blob, c.blobStore)

out := make(chan batcher.EncodingResultOrStatus)
err := c.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
assert.NoError(t, err)
batch, err := c.encodingStreamer.CreateMinibatch(ctx)
assert.NoError(t, err)
minibatchIndex := uint(12)
c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.Canceled)
c.minibatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex)
c.pool.StopWait()
requests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.CurrentBatchID, minibatchIndex)
assert.NoError(t, err)
assert.Len(t, requests, 2)

responses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.CurrentBatchID, minibatchIndex)
assert.NoError(t, err)
indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0})
assert.NoError(t, err)
assert.Len(t, responses, len(indexedState.IndexedOperators))
for _, response := range responses {
assert.ErrorContains(t, response.Error, "context canceled")
for _, request := range requests {
if request.OperatorID == response.OperatorID {
assert.GreaterOrEqual(t, response.RespondedAt, request.RequestedAt)
}
}
}
}

func TestMinibatcherTooManyPendingRequests(t *testing.T) {
c := newMinibatcher(t, defaultConfig)
ctx := context.Background()
Expand All @@ -450,6 +556,6 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) {
m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger)
assert.NoError(t, err)
mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once()
err = m.HandleSingleBatch(ctx)
_, err = m.HandleSingleMinibatch(ctx)
assert.ErrorContains(t, err, "too many pending requests")
}

0 comments on commit 8ff9fe1

Please sign in to comment.