Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BatchConfirmer] Apply state transitions for full batches #691

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions disperser/batcher/batch_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
)

Expand Down Expand Up @@ -135,6 +136,9 @@ func (b *BatchConfirmer) updateConfirmationInfo(
if txnReceipt.BlockNumber == nil {
return nil, errors.New("error getting transaction receipt block number")
}
if batchData.batchID == uuid.Nil {
return nil, errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil")
}
if len(batchData.blobs) == 0 {
return nil, errors.New("failed to process confirmed batch: no blobs from transaction manager metadata")
}
Expand Down Expand Up @@ -170,15 +174,7 @@ func (b *BatchConfirmer) updateConfirmationInfo(
if isBlobAttested(batchData.aggSig.QuorumResults, batchData.blobHeaders[blobIndex]) {
status = disperser.Confirmed
// generate inclusion proof
blobHeader := batchData.blobHeaders[blobIndex]

blobHeaderHash, err := blobHeader.GetBlobHeaderHash()
if err != nil {
b.logger.Error("failed to get blob header hash", "err", err)
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
continue
}
merkleProof, err := batchData.merkleTree.GenerateProof(blobHeaderHash[:], 0)
Comment on lines -173 to -181
Copy link
Contributor Author

@ian-shim ian-shim Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proof should be generated based on leaf index, not the leaf data

merkleProof, err := batchData.merkleTree.GenerateProofWithIndex(uint64(blobIndex), 0)
if err != nil {
b.logger.Error("failed to generate blob header inclusion proof", "err", err)
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
Expand All @@ -188,13 +184,14 @@ func (b *BatchConfirmer) updateConfirmationInfo(
}

confirmationInfo := &disperser.ConfirmationInfo{
BatchHeaderHash: headerHash,
BlobIndex: uint32(blobIndex),
SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners),
ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber),
BatchRoot: batchData.batchHeader.BatchRoot[:],
BlobInclusionProof: proof,
BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments,
BatchHeaderHash: headerHash,
BlobIndex: uint32(blobIndex),
SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners),
ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber),
BatchRoot: batchData.batchHeader.BatchRoot[:],
BlobInclusionProof: proof,
BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments,
// This is onchain, external batch ID, which is different from the internal representation of batch UUID
Comment on lines +187 to +194
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change in this block. Just adding comment in L194

BatchID: uint32(batchID),
ConfirmationTxnHash: txnReceipt.TxHash,
ConfirmationBlockNumber: uint32(txnReceipt.BlockNumber.Uint64()),
Expand Down Expand Up @@ -235,12 +232,15 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr
if len(blobs) == 0 {
return errors.New("failed to process confirmed batch: no blobs from transaction manager metadata")
}
if confirmationMetadata.batchID == uuid.Nil {
return errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil")
}
if receiptOrErr.Err != nil {
_ = b.handleFailure(ctx, blobs, FailConfirmBatch)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailConfirmBatch)
return fmt.Errorf("failed to confirm batch onchain: %w", receiptOrErr.Err)
}
if confirmationMetadata.aggSig == nil {
_ = b.handleFailure(ctx, blobs, FailNoAggregatedSignature)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailNoAggregatedSignature)
return errors.New("failed to process confirmed batch: aggSig from transaction manager metadata is nil")
}
b.logger.Info("received ConfirmBatch transaction receipt", "blockNumber", receiptOrErr.Receipt.BlockNumber, "txnHash", receiptOrErr.Receipt.TxHash.Hex())
Expand All @@ -249,23 +249,28 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr
stageTimer := time.Now()
blobsToRetry, err := b.updateConfirmationInfo(ctx, confirmationMetadata, receiptOrErr.Receipt)
if err != nil {
_ = b.handleFailure(ctx, blobs, FailUpdateConfirmationInfo)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailUpdateConfirmationInfo)
return fmt.Errorf("failed to update confirmation info: %w", err)
}
if len(blobsToRetry) > 0 {
b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(blobs))
_ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo)
_ = b.handleFailure(ctx, confirmationMetadata.batchID, blobsToRetry, FailUpdateConfirmationInfo)
} else {
err = b.MinibatchStore.UpdateBatchStatus(ctx, confirmationMetadata.batchID, BatchStatusAttested)
if err != nil {
b.logger.Error("error updating batch status", "err", err)
}
}
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String())
batchSize := int64(0)
for _, blobMeta := range blobs {
batchSize += int64(blobMeta.RequestMetadata.BlobSize)
}
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String(), "batchSize", batchSize)

return nil
}

func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
numPermanentFailures := 0
for _, metadata := range blobMetadatas {
Expand All @@ -283,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*dis
numPermanentFailures++
}

err := b.MinibatchStore.UpdateBatchStatus(ctx, batchID, BatchStatusFailed)
if err != nil {
b.logger.Error("error updating batch status", "err", err)
}

// Return the error(s)
return result.ErrorOrNil()
}
Expand Down Expand Up @@ -340,7 +350,8 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
return errors.New("batch not dispersed")
}

// Get batch state
// Try getting batch state from minibatcher cache
// TODO(ian-shim): If not found, get it from the minibatch store
batchState := b.Minibatcher.PopBatchState(batch.ID)
if batchState == nil {
return fmt.Errorf("no batch state found for batch %s", batch.ID)
Expand Down Expand Up @@ -389,10 +400,9 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {

// Aggregate the signatures
b.logger.Debug("Aggregating signatures...")

quorumAttestation, err := b.Aggregator.ReceiveSignatures(ctx, batchState.OperatorState, batchHeaderHash, replyChan)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("error receiving and validating signatures: %w", err)
}
operatorCount := make(map[core.QuorumID]int)
Expand All @@ -416,7 +426,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
numPassed, passedQuorums := numBlobsAttestedByQuorum(quorumAttestation.QuorumResults, batchState.BlobHeaders)
// TODO(mooselumph): Determine whether to confirm the batch based on the number of successes
if numPassed == 0 {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailNoSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailNoSignatures)
return errors.New("no blobs received sufficient signatures")
}

Expand All @@ -429,26 +439,27 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
// Aggregate the signatures across only the non-empty quorums. Excluding empty quorums reduces the gas cost.
aggSig, err := b.Aggregator.AggregateSignatures(ctx, b.ChainState, batchHeader.ReferenceBlockNumber, quorumAttestation, nonEmptyQuorums)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("error aggregating signatures: %w", err)
}

b.logger.Debug("Confirming batch...")

txn, err := b.Transactor.BuildConfirmBatchTxn(ctx, batchHeader, aggSig.QuorumResults, aggSig)
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("error building confirmBatch transaction: %w", err)
}
err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{
batchID: batch.ID,
batchHeader: batchHeader,
blobs: batchState.BlobMetadata,
blobHeaders: batchState.BlobHeaders,
merkleTree: merkleTree,
aggSig: aggSig,
}))
if err != nil {
_ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch)
_ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("error sending confirmBatch transaction: %w", err)
}

Expand Down
Loading
Loading