diff --git a/disperser/batcher/batch_confirmer.go b/disperser/batcher/batch_confirmer.go index 0e0512a48d..fed87c640f 100644 --- a/disperser/batcher/batch_confirmer.go +++ b/disperser/batcher/batch_confirmer.go @@ -15,6 +15,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/core/types" + "github.com/google/uuid" "github.com/hashicorp/go-multierror" ) @@ -135,6 +136,9 @@ func (b *BatchConfirmer) updateConfirmationInfo( if txnReceipt.BlockNumber == nil { return nil, errors.New("error getting transaction receipt block number") } + if batchData.batchID == uuid.Nil { + return nil, errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil") + } if len(batchData.blobs) == 0 { return nil, errors.New("failed to process confirmed batch: no blobs from transaction manager metadata") } @@ -170,15 +174,7 @@ func (b *BatchConfirmer) updateConfirmationInfo( if isBlobAttested(batchData.aggSig.QuorumResults, batchData.blobHeaders[blobIndex]) { status = disperser.Confirmed // generate inclusion proof - blobHeader := batchData.blobHeaders[blobIndex] - - blobHeaderHash, err := blobHeader.GetBlobHeaderHash() - if err != nil { - b.logger.Error("failed to get blob header hash", "err", err) - blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex]) - continue - } - merkleProof, err := batchData.merkleTree.GenerateProof(blobHeaderHash[:], 0) + merkleProof, err := batchData.merkleTree.GenerateProofWithIndex(uint64(blobIndex), 0) if err != nil { b.logger.Error("failed to generate blob header inclusion proof", "err", err) blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex]) @@ -188,13 +184,14 @@ func (b *BatchConfirmer) updateConfirmationInfo( } confirmationInfo := &disperser.ConfirmationInfo{ - BatchHeaderHash: headerHash, - BlobIndex: uint32(blobIndex), - SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners), - ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber), - BatchRoot: batchData.batchHeader.BatchRoot[:], - BlobInclusionProof: proof, - BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments, + BatchHeaderHash: headerHash, + BlobIndex: uint32(blobIndex), + SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batchData.batchHeader.ReferenceBlockNumber), batchData.aggSig.NonSigners), + ReferenceBlockNumber: uint32(batchData.batchHeader.ReferenceBlockNumber), + BatchRoot: batchData.batchHeader.BatchRoot[:], + BlobInclusionProof: proof, + BlobCommitment: &batchData.blobHeaders[blobIndex].BlobCommitments, + // This is onchain, external batch ID, which is different from the internal representation of batch UUID BatchID: uint32(batchID), ConfirmationTxnHash: txnReceipt.TxHash, ConfirmationBlockNumber: uint32(txnReceipt.BlockNumber.Uint64()), @@ -235,12 +232,15 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr if len(blobs) == 0 { return errors.New("failed to process confirmed batch: no blobs from transaction manager metadata") } + if confirmationMetadata.batchID == uuid.Nil { + return errors.New("failed to process confirmed batch: batch ID from transaction manager metadata is nil") + } if receiptOrErr.Err != nil { - _ = b.handleFailure(ctx, blobs, FailConfirmBatch) + _ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailConfirmBatch) return fmt.Errorf("failed to confirm batch onchain: %w", receiptOrErr.Err) } if confirmationMetadata.aggSig == nil { - _ = b.handleFailure(ctx, blobs, FailNoAggregatedSignature) + _ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailNoAggregatedSignature) return errors.New("failed to process confirmed batch: aggSig from transaction manager metadata is nil") } b.logger.Info("received ConfirmBatch transaction receipt", "blockNumber", receiptOrErr.Receipt.BlockNumber, "txnHash", receiptOrErr.Receipt.TxHash.Hex()) @@ -249,23 +249,28 @@ func (b *BatchConfirmer) ProcessConfirmedBatch(ctx context.Context, receiptOrErr stageTimer := time.Now() blobsToRetry, err := b.updateConfirmationInfo(ctx, confirmationMetadata, receiptOrErr.Receipt) if err != nil { - _ = b.handleFailure(ctx, blobs, FailUpdateConfirmationInfo) + _ = b.handleFailure(ctx, confirmationMetadata.batchID, blobs, FailUpdateConfirmationInfo) return fmt.Errorf("failed to update confirmation info: %w", err) } if len(blobsToRetry) > 0 { b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(blobs)) - _ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo) + _ = b.handleFailure(ctx, confirmationMetadata.batchID, blobsToRetry, FailUpdateConfirmationInfo) + } else { + err = b.MinibatchStore.UpdateBatchStatus(ctx, confirmationMetadata.batchID, BatchStatusAttested) + if err != nil { + b.logger.Error("error updating batch status", "err", err) + } } - b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String()) batchSize := int64(0) for _, blobMeta := range blobs { batchSize += int64(blobMeta.RequestMetadata.BlobSize) } + b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String(), "batchSize", batchSize) return nil } -func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { +func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { var result *multierror.Error numPermanentFailures := 0 for _, metadata := range blobMetadatas { @@ -283,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, blobMetadatas []*dis numPermanentFailures++ } + err := b.MinibatchStore.UpdateBatchStatus(ctx, batchID, BatchStatusFailed) + if err != nil { + b.logger.Error("error updating batch status", "err", err) + } + // Return the error(s) return result.ErrorOrNil() } @@ -340,7 +350,8 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { return errors.New("batch not dispersed") } - // Get batch state + // Try getting batch state from minibatcher cache + // TODO(ian-shim): If not found, get it from the minibatch store batchState := b.Minibatcher.PopBatchState(batch.ID) if batchState == nil { return fmt.Errorf("no batch state found for batch %s", batch.ID) @@ -389,10 +400,9 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { // Aggregate the signatures b.logger.Debug("Aggregating signatures...") - quorumAttestation, err := b.Aggregator.ReceiveSignatures(ctx, batchState.OperatorState, batchHeaderHash, replyChan) if err != nil { - _ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures) + _ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures) return fmt.Errorf("error receiving and validating signatures: %w", err) } operatorCount := make(map[core.QuorumID]int) @@ -416,7 +426,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { numPassed, passedQuorums := numBlobsAttestedByQuorum(quorumAttestation.QuorumResults, batchState.BlobHeaders) // TODO(mooselumph): Determine whether to confirm the batch based on the number of successes if numPassed == 0 { - _ = b.handleFailure(ctx, batchState.BlobMetadata, FailNoSignatures) + _ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailNoSignatures) return errors.New("no blobs received sufficient signatures") } @@ -429,7 +439,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { // Aggregate the signatures across only the non-empty quorums. Excluding empty quorums reduces the gas cost. aggSig, err := b.Aggregator.AggregateSignatures(ctx, b.ChainState, batchHeader.ReferenceBlockNumber, quorumAttestation, nonEmptyQuorums) if err != nil { - _ = b.handleFailure(ctx, batchState.BlobMetadata, FailAggregateSignatures) + _ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailAggregateSignatures) return fmt.Errorf("error aggregating signatures: %w", err) } @@ -437,10 +447,11 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { txn, err := b.Transactor.BuildConfirmBatchTxn(ctx, batchHeader, aggSig.QuorumResults, aggSig) if err != nil { - _ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch) + _ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch) return fmt.Errorf("error building confirmBatch transaction: %w", err) } err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{ + batchID: batch.ID, batchHeader: batchHeader, blobs: batchState.BlobMetadata, blobHeaders: batchState.BlobHeaders, @@ -448,7 +459,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { aggSig: aggSig, })) if err != nil { - _ = b.handleFailure(ctx, batchState.BlobMetadata, FailConfirmBatch) + _ = b.handleFailure(ctx, batch.ID, batchState.BlobMetadata, FailConfirmBatch) return fmt.Errorf("error sending confirmBatch transaction: %w", err) } diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index 04fe1e3b11..dc1048e36a 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -2,10 +2,12 @@ package batcher_test import ( "context" + "encoding/hex" "math/big" "testing" "time" + "github.com/Layr-Labs/eigenda/common" cmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" @@ -16,6 +18,7 @@ import ( batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" + "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -25,6 +28,11 @@ import ( "github.com/stretchr/testify/mock" ) +var ( + assignmentCoordinator = &core.StdAssignmentCoordinator{} + encoderProver encoding.Prover = nil +) + type batchConfirmerComponents struct { batchConfirmer *bat.BatchConfirmer blobStore disperser.BlobStore @@ -40,7 +48,8 @@ type batchConfirmerComponents struct { func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { logger := logging.NewNoopLogger() - asgn := &core.StdAssignmentCoordinator{} + // logger, err := common.NewLogger(common.DefaultLoggerConfig()) + // assert.NoError(t, err) transactor := &coremock.MockTransactor{} agg, err := core.NewStdSignatureAggregator(logger, transactor) assert.NoError(t, err) @@ -51,15 +60,15 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { txnManager := batmock.NewTxnManager() minibatchStore := batcherinmem.NewMinibatchStore(logger) encodingWorkerPool := workerpool.New(10) - p, err := makeTestProver() + encoderProver, err = makeTestProver() assert.NoError(t, err) - encoderClient := disperser.NewLocalEncoderClient(p) + encoderClient := disperser.NewLocalEncoderClient(encoderProver) metrics := bat.NewMetrics("9100", logger) trigger := bat.NewEncodedSizeNotifier( make(chan struct{}, 1), 10*1024*1024, ) - encodingStreamer, err := bat.NewEncodingStreamer(streamerConfig, blobStore, mockChainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) + encodingStreamer, err := bat.NewEncodingStreamer(streamerConfig, blobStore, mockChainState, encoderClient, assignmentCoordinator, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) assert.NoError(t, err) pool := workerpool.New(int(10)) minibatcher, err := bat.NewMinibatcher(bat.MinibatcherConfig{ @@ -67,7 +76,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, mockChainState, asgn, encodingStreamer, ethClient, pool, logger) + }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ @@ -79,7 +88,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { SRSOrder: 3000, MaxNumRetriesPerBlob: 2, } - b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, asgn, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger) + b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger) assert.NoError(t, err) ethClient.On("BlockNumber").Return(uint64(initialBlock), nil) @@ -98,8 +107,56 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { } } +func generateBlobAndHeader(t *testing.T, operatorState *core.OperatorState, securityParams []*core.SecurityParam) (*core.Blob, *core.BlobHeader) { + assert.NotNil(t, operatorState) + assert.Greater(t, len(securityParams), 0) + assert.NotNil(t, assignmentCoordinator) + assert.NotNil(t, encoderProver) + + blob := makeTestBlob(securityParams) + blobLength := encoding.GetBlobLength(uint(len(blob.Data))) + blobHeader := &core.BlobHeader{} + blobQuorumInfo := &core.BlobQuorumInfo{} + chunkLength := uint(0) + var err error + for _, sp := range securityParams { + chunkLength, err = assignmentCoordinator.CalculateChunkLength(operatorState, blobLength, streamerConfig.TargetNumChunks, sp) + assert.NoError(t, err) + blobQuorumInfo = &core.BlobQuorumInfo{ + SecurityParam: *sp, + ChunkLength: chunkLength, + } + blobHeader.QuorumInfos = append(blobHeader.QuorumInfos, blobQuorumInfo) + } + + // use the latest security param to encode the blob + _, info, err := assignmentCoordinator.GetAssignments(operatorState, blobLength, blobQuorumInfo) + assert.NoError(t, err) + params := encoding.ParamsFromMins(chunkLength, info.TotalChunks) + commits, _, err := encoderProver.EncodeAndProve(blob.Data, params) + assert.NoError(t, err) + blobHeader.BlobCommitments = commits + return &blob, blobHeader +} + func TestBatchConfirmerIteration(t *testing.T) { components := makeBatchConfirmer(t) + ctx := context.Background() + operatorState := components.chainData.GetTotalOperatorState(ctx, initialBlock) + blob1, blobHeader1 := generateBlobAndHeader(t, operatorState.OperatorState, []*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blobHeaderHash1, err := blobHeader1.GetBlobHeaderHash() + assert.NoError(t, err) + blob2, blobHeader2 := generateBlobAndHeader(t, operatorState.OperatorState, []*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + blobHeaderHash2, err := blobHeader2.GetBlobHeaderHash() + assert.NoError(t, err) b := components.batchConfirmer batchID, err := uuid.NewV7() assert.NoError(t, err) @@ -112,42 +169,65 @@ func TestBatchConfirmerIteration(t *testing.T) { AggregatePubKey: &core.G2Point{}, AggregateSignature: &core.Signature{}, } + + // Set up batch and minibatch err = b.MinibatchStore.PutBatch(context.Background(), batch) assert.NoError(t, err) err = b.MinibatchStore.PutMinibatch(context.Background(), &bat.MinibatchRecord{ BatchID: batchID, MinibatchIndex: 0, - BlobHeaderHashes: [][32]byte{{1}, {2}}, - BatchSize: 0, + BlobHeaderHashes: [][32]byte{blobHeaderHash1}, + BatchSize: 1, ReferenceBlockNumber: uint(initialBlock), }) assert.NoError(t, err) err = b.MinibatchStore.PutMinibatch(context.Background(), &bat.MinibatchRecord{ BatchID: batchID, MinibatchIndex: 1, - BlobHeaderHashes: [][32]byte{{3}, {4}}, - BatchSize: 0, + BlobHeaderHashes: [][32]byte{blobHeaderHash2}, + BatchSize: 1, ReferenceBlockNumber: uint(initialBlock), }) assert.NoError(t, err) - operatorState := components.chainData.GetTotalOperatorState(context.Background(), 0) - + requestedAt1, blobKey1 := queueBlob(t, ctx, blob1, components.blobStore) + _, blobKey2 := queueBlob(t, ctx, blob2, components.blobStore) + meta1, err := components.blobStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + meta2, err := components.blobStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + batchHeader1 := &core.BatchHeader{ + ReferenceBlockNumber: initialBlock, + BatchRoot: [32]byte{}, + } + _, err = batchHeader1.SetBatchRoot([]*core.BlobHeader{blobHeader1}) + assert.NoError(t, err) + batchHeaderHash1, err := batchHeader1.GetBatchHeaderHash() + assert.NoError(t, err) + batchHeader2 := &core.BatchHeader{ + ReferenceBlockNumber: initialBlock, + BatchRoot: [32]byte{}, + } + _, err = batchHeader2.SetBatchRoot([]*core.BlobHeader{blobHeader2}) + assert.NoError(t, err) + batchHeaderHash2, err := batchHeader2.GetBatchHeaderHash() + assert.NoError(t, err) + // Set up dispersal requests and responses for opID, opInfo := range operatorState.PrivateOperators { req0 := &bat.DispersalRequest{ BatchID: batchID, MinibatchIndex: 0, OperatorID: opID, Socket: opInfo.DispersalPort, - NumBlobs: 2, + NumBlobs: 1, RequestedAt: time.Now(), - BlobHash: "0", - MetadataHash: "0", + BlobHash: blobKey1.BlobHash, + MetadataHash: blobKey1.MetadataHash, } err = b.MinibatchStore.PutDispersalRequest(context.Background(), req0) assert.NoError(t, err) err = b.MinibatchStore.PutDispersalResponse(context.Background(), &bat.DispersalResponse{ DispersalRequest: *req0, - Signatures: []*core.Signature{opInfo.KeyPair.SignMessage([32]byte{0})}, + Signatures: []*core.Signature{opInfo.KeyPair.SignMessage(batchHeaderHash1)}, RespondedAt: time.Now(), Error: nil, }) @@ -158,72 +238,38 @@ func TestBatchConfirmerIteration(t *testing.T) { MinibatchIndex: 1, OperatorID: opID, Socket: opInfo.DispersalPort, - NumBlobs: 2, + NumBlobs: 1, RequestedAt: time.Now(), - BlobHash: "1", - MetadataHash: "1", + BlobHash: blobKey2.BlobHash, + MetadataHash: blobKey2.MetadataHash, } err = b.MinibatchStore.PutDispersalRequest(context.Background(), req1) assert.NoError(t, err) err = b.MinibatchStore.PutDispersalResponse(context.Background(), &bat.DispersalResponse{ DispersalRequest: *req1, - Signatures: []*core.Signature{opInfo.KeyPair.SignMessage([32]byte{1})}, + Signatures: []*core.Signature{opInfo.KeyPair.SignMessage(batchHeaderHash2)}, RespondedAt: time.Now(), Error: nil, }) assert.NoError(t, err) } + // Set up batch state b.Minibatcher.Batches[batchID] = &bat.BatchState{ BatchID: batchID, ReferenceBlockNumber: uint(initialBlock), BlobHeaders: []*core.BlobHeader{ - { - AccountID: "0", - QuorumInfos: []*core.BlobQuorumInfo{ - { - SecurityParam: core.SecurityParam{ - QuorumID: 0, - AdversaryThreshold: 30, - ConfirmationThreshold: 80, - }, - }, - { - SecurityParam: core.SecurityParam{ - QuorumID: 1, - AdversaryThreshold: 30, - ConfirmationThreshold: 80, - }, - }, - }, - }, - { - AccountID: "1", - QuorumInfos: []*core.BlobQuorumInfo{ - { - SecurityParam: core.SecurityParam{ - QuorumID: 0, - AdversaryThreshold: 30, - ConfirmationThreshold: 80, - }, - }, - { - SecurityParam: core.SecurityParam{ - QuorumID: 1, - AdversaryThreshold: 30, - ConfirmationThreshold: 80, - }, - }, - }, - }, + blobHeader1, + blobHeader2, }, - BlobMetadata: []*disperser.BlobMetadata{}, + BlobMetadata: []*disperser.BlobMetadata{meta1, meta2}, OperatorState: operatorState.IndexedOperatorState, NumMinibatches: 2, } + // Receive signatures signChan := make(chan core.SigningMessage, 4) - batchHeaderHash := [32]byte{93, 156, 41, 17, 3, 78, 159, 243, 222, 111, 54, 107, 237, 48, 243, 176, 224, 151, 96, 151, 159, 99, 118, 186, 53, 192, 72, 59, 160, 73, 7, 213} + batchHeaderHash := [32]byte{138, 1, 226, 93, 51, 120, 236, 124, 91, 206, 100, 187, 237, 1, 193, 151, 137, 131, 30, 218, 139, 24, 221, 105, 141, 253, 242, 13, 239, 199, 179, 42} for opID, opInfo := range operatorState.PrivateOperators { signChan <- core.SigningMessage{ Signature: opInfo.KeyPair.SignMessage(batchHeaderHash), @@ -241,11 +287,82 @@ func TestBatchConfirmerIteration(t *testing.T) { assert.NotNil(t, args.Get(1).(*core.BatchHeader).BatchRoot) assert.Equal(t, args.Get(2).(map[uint8]*core.QuorumResult)[0].PercentSigned, uint8(100)) assert.Equal(t, args.Get(2).(map[uint8]*core.QuorumResult)[1].PercentSigned, uint8(100)) + + aggSig := args[3].(*core.SignatureAggregation) + assert.Empty(t, aggSig.NonSigners) + assert.Len(t, aggSig.QuorumAggPubKeys, 2) + assert.Contains(t, aggSig.QuorumAggPubKeys, core.QuorumID(0)) + assert.Contains(t, aggSig.QuorumAggPubKeys, core.QuorumID(1)) + assert.Equal(t, aggSig.QuorumResults, map[core.QuorumID]*core.QuorumResult{ + core.QuorumID(0): { + QuorumID: core.QuorumID(0), + PercentSigned: uint8(100), + }, + core.QuorumID(1): { + QuorumID: core.QuorumID(1), + PercentSigned: uint8(100), + }, + }) }, ).Return(txn, nil) components.txnManager.On("ProcessTransaction").Return(nil) - err = b.HandleSingleBatch(context.Background()) + err = b.HandleSingleBatch(ctx) + assert.NoError(t, err) + + // Validate batch confirmation + assert.Greater(t, len(components.txnManager.Requests), 0) + // logData should be encoding 3 and 0 + logData, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000") assert.NoError(t, err) + txHash := gethcommon.HexToHash("0x1234") + blockNumber := big.NewInt(123) + err = b.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: &types.Receipt{ + Logs: []*types.Log{ + { + Topics: []gethcommon.Hash{common.BatchConfirmedEventSigHash, gethcommon.HexToHash("1234")}, + Data: logData, + }, + }, + BlockNumber: blockNumber, + TxHash: txHash, + }, + Err: nil, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.NoError(t, err) + // Check that the blob was processed + meta1, err = components.blobStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, blobKey1, meta1.GetBlobKey()) + assert.Equal(t, requestedAt1, meta1.RequestMetadata.RequestedAt) + assert.Equal(t, disperser.Confirmed, meta1.BlobStatus) + assert.Equal(t, meta1.ConfirmationInfo.BatchID, uint32(3)) + assert.Equal(t, meta1.ConfirmationInfo.ConfirmationTxnHash, txHash) + assert.Equal(t, meta1.ConfirmationInfo.ConfirmationBlockNumber, uint32(blockNumber.Int64())) + + meta2, err = components.blobStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, blobKey2, meta2.GetBlobKey()) + assert.Equal(t, disperser.Confirmed, meta2.BlobStatus) + + res, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(meta1.GetBlobKey(), 0) + assert.ErrorContains(t, err, "no such key") + assert.Nil(t, res) + res, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(meta2.GetBlobKey(), 1) + assert.ErrorContains(t, err, "no such key") + assert.Nil(t, res) + count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 0, count) + assert.Equal(t, uint64(0), size) + + batch, err = components.minibatchStore.GetBatch(context.Background(), batch.ID) + assert.NoError(t, err) + assert.Equal(t, batch.Status, bat.BatchStatusAttested) + batches, err := components.minibatchStore.GetBatchesByStatus(context.Background(), bat.BatchStatusAttested) + assert.NoError(t, err) + assert.Equal(t, len(batches), 1) + assert.Equal(t, batches[0].ID, batch.ID) } func TestBatchConfirmerIterationFailure(t *testing.T) { @@ -474,4 +591,7 @@ func TestBatchConfirmerInsufficientSignatures(t *testing.T) { components.transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil) err = b.HandleSingleBatch(context.Background()) assert.ErrorContains(t, err, "no blobs received sufficient signatures") + batch, err = components.minibatchStore.GetBatch(context.Background(), batch.ID) + assert.NoError(t, err) + assert.Equal(t, batch.Status, bat.BatchStatusFailed) } diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index e55ce932f7..e4b0fc20a0 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/core/types" "github.com/gammazero/workerpool" + "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" "github.com/wealdtech/go-merkletree/v2" @@ -399,6 +400,7 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser. } type confirmationMetadata struct { + batchID uuid.UUID batchHeader *core.BatchHeader blobs []*disperser.BlobMetadata blobHeaders []*core.BlobHeader @@ -507,6 +509,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { return fmt.Errorf("HandleSingleBatch: error building confirmBatch transaction: %w", err) } err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{ + batchID: uuid.Nil, batchHeader: batch.BatchHeader, blobs: batch.BlobMetadata, blobHeaders: batch.BlobHeaders, diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index f909d9c6ea..7d15a45799 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -17,7 +17,7 @@ type BatchStatus uint // Formed: the batch has been formed and no more minibatches can be added. Implies that all minibatch records and dispersal request records have been created. // // Attested: the batch has been attested. -// Failed: the batch has failed. +// Failed: the batch has failed. Retries will be attempted at the blob level. // // The batch lifecycle is as follows: // Pending -> Formed -> Attested