Skip to content

Commit

Permalink
[BatchConfirmer] Apply state transitions for full batches (#691)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Aug 13, 2024
1 parent 8ff9fe1 commit 8094491
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 91 deletions.
69 changes: 40 additions & 29 deletions disperser/batcher/batch_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
)

Expand Down Expand Up @@ -135,6 +136,9 @@ func (b *BatchConfirmer) updateConfirmationInfo(
if txnReceipt.BlockNumber == nil {
return nil, errors.New("error getting transaction receipt block number")
}
if batchData.batchID == uuid.Nil {
return nil, errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil")
}
if len(batchData.blobs) == 0 {
return nil, errors.New("failed to process confirmed batch: no blobs from transaction manager metadata")
}
Expand Down Expand Up @@ -170,15 +174,7 @@ func (b *BatchConfirmer) updateConfirmationInfo(
if isBlobAttested(batchData.aggSig.QuorumResults, batchData.blobHeaders[blobIndex]) {
status = disperser.Confirmed
// generate inclusion proof
blobHeader := batchData.blobHeaders[blobIndex]

blobHeaderHash, err := blobHeader.GetBlobHeaderHash()
if err != nil {
b.logger.Error("failed to get blob header hash", "err", err)
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
continue
}
merkleProof, err := batchData.merkleTree.GenerateProof(blobHeaderHash[:], 0)
merkleProof, err := batchData.merkleTree.GenerateProofWithIndex(uint64(blobIndex), 0)
if err != nil {
b.logger.Error("failed to generate blob header inclusion proof", "err", err)
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
Expand All @@ -188,13 +184,14 @@ func (b *BatchConfirmer) updateConfirmationInfo(
}

confirmationInfo := &disperser.ConfirmationInfo{
BatchHeaderHash: headerHash,
BlobIndex: uint32(blobIndex),
SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners),
ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber),
BatchRoot: batchData.batchHeader.BatchRoot[:],
BlobInclusionProof: proof,
BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments,
BatchHeaderHash: headerHash,
BlobIndex: uint32(blobIndex),
SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners),
ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber),
BatchRoot: batchData.batchHeader.BatchRoot[:],
BlobInclusionProof: proof,
BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments,
// This is onchain, external batch ID, which is different from the internal representation of batch UUID
BatchID: uint32(batchID),
ConfirmationTxnHash: txnReceipt.TxHash,
ConfirmationBlockNumber: uint32(txnReceipt.BlockNumber.Uint64()),
Expand Down Expand Up @@ -235,12 +232,15 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr
if len(blobs) == 0 {
return errors.New("failed to process confirmed batch: no blobs from transaction manager metadata")
}
if confirmationMetadata.batchID == uuid.Nil {
return errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil")
}
if receiptOrErr.Err != nil {
_ = b.handleFailure(ctx, blobs, FailConfirmBatch)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailConfirmBatch)
return fmt.Errorf("failed to confirm batch onchain: %w", receiptOrErr.Err)
}
if confirmationMetadata.aggSig == nil {
_ = b.handleFailure(ctx, blobs, FailNoAggregatedSignature)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailNoAggregatedSignature)
return errors.New("failed to process confirmed batch: aggSig from transaction manager metadata is nil")
}
b.logger.Info("received ConfirmBatch transaction receipt", "blockNumber", receiptOrErr.Receipt.BlockNumber, "txnHash", receiptOrErr.Receipt.TxHash.Hex())
Expand All @@ -249,23 +249,28 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr
stageTimer := time.Now()
blobsToRetry, err := b.updateConfirmationInfo(ctx, confirmationMetadata, receiptOrErr.Receipt)
if err != nil {
_ = b.handleFailure(ctx, blobs, FailUpdateConfirmationInfo)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailUpdateConfirmationInfo)
return fmt.Errorf("failed to update confirmation info: %w", err)
}
if len(blobsToRetry) > 0 {
b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(blobs))
_ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobsToRetry, FailUpdateConfirmationInfo)
} else {
err = b.MinibatchStore.UpdateBatchStatus(ctx, confirmationMetadata.batchID, BatchStatusAttested)
if err != nil {
b.logger.Error("error updating batch status", "err", err)
}
}
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String())
batchSize := int64(0)
for _, blobMeta := range blobs {
batchSize += int64(blobMeta.RequestMetadata.BlobSize)
}
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String(), "batchSize", batchSize)

return nil
}

func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
numPermanentFailures := 0
for _, metadata := range blobMetadatas {
Expand All @@ -283,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*dis
numPermanentFailures++
}

err := b.MinibatchStore.UpdateBatchStatus(ctx, batchID, BatchStatusFailed)
if err != nil {
b.logger.Error("error updating batch status", "err", err)
}

// Return the error(s)
return result.ErrorOrNil()
}
Expand Down Expand Up @@ -340,7 +350,8 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
return errors.New("batch not dispersed")
}

// Get batch state
// Try getting batch state from minibatcher cache
// TODO(ian-shim): If not found, get it from the minibatch store
batchState := b.Minibatcher.PopBatchState(batch.ID)
if batchState == nil {
return fmt.Errorf("no batch state found for batch %s", batch.ID)
Expand Down Expand Up @@ -389,10 +400,9 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {

// Aggregate the signatures
b.logger.Debug("Aggregating signatures...")

quorumAttestation, err := b.Aggregator.ReceiveSignatures(ctx, batchState.OperatorState, batchHeaderHash, replyChan)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("error receiving and validating signatures: %w", err)
}
operatorCount := make(map[core.QuorumID]int)
Expand All @@ -416,7 +426,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
numPassed, passedQuorums := numBlobsAttestedByQuorum(quorumAttestation.QuorumResults, batchState.BlobHeaders)
// TODO(mooselumph): Determine whether to confirm the batch based on the number of successes
if numPassed == 0 {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailNoSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailNoSignatures)
return errors.New("no blobs received sufficient signatures")
}

Expand All @@ -429,26 +439,27 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
// Aggregate the signatures across only the non-empty quorums. Excluding empty quorums reduces the gas cost.
aggSig, err := b.Aggregator.AggregateSignatures(ctx, b.ChainState, batchHeader.ReferenceBlockNumber, quorumAttestation, nonEmptyQuorums)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("error aggregating signatures: %w", err)
}

b.logger.Debug("Confirming batch...")

txn, err := b.Transactor.BuildConfirmBatchTxn(ctx, batchHeader, aggSig.QuorumResults, aggSig)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("error building confirmBatch transaction: %w", err)
}
err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{
batchID: batch.ID,
batchHeader: batchHeader,
blobs: batchState.BlobMetadata,
blobHeaders: batchState.BlobHeaders,
merkleTree: merkleTree,
aggSig: aggSig,
}))
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("error sending confirmBatch transaction: %w", err)
}

Expand Down
Loading

0 comments on commit 8094491

Please sign in to comment.