diff --git a/core/eth/tx.go b/core/eth/tx.go index c8e47fd496..083110a01d 100644 --- a/core/eth/tx.go +++ b/core/eth/tx.go @@ -417,7 +417,7 @@ func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums [] // BuildConfirmBatchTxn builds a transaction to confirm a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds // specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail. // Note that this function returns a transaction without publishing it to the blockchain. The caller is responsible for publishing the transaction. -func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Transaction, error) { +func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader *core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation *core.SignatureAggregation) (*types.Transaction, error) { quorumNumbers := quorumParamsToQuorumNumbers(quorums) nonSignerOperatorIds := make([][32]byte, len(signatureAggregation.NonSigners)) for i := range signatureAggregation.NonSigners { @@ -496,7 +496,7 @@ func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader core. // ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds // specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail. -func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) { +func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader *core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation *core.SignatureAggregation) (*types.Receipt, error) { tx, err := t.BuildConfirmBatchTxn(ctx, batchHeader, quorums, signatureAggregation) if err != nil { t.Logger.Error("Failed to build a ConfirmBatch txn", "err", err) diff --git a/core/mock/tx.go b/core/mock/tx.go index 53f0ae1ad7..7b234242c5 100644 --- a/core/mock/tx.go +++ b/core/mock/tx.go @@ -71,7 +71,13 @@ func (t *MockTransactor) GetOperatorStakesForQuorums(ctx context.Context, quorum return result.(core.OperatorStakes), args.Error(1) } -func (t *MockTransactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) { +func (t *MockTransactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader *core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation *core.SignatureAggregation) (*types.Transaction, error) { + args := t.Called() + result := args.Get(0) + return result.(*types.Transaction), args.Error(1) +} + +func (t *MockTransactor) ConfirmBatch(ctx context.Context, batchHeader *core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation *core.SignatureAggregation) (*types.Receipt, error) { args := t.Called() var receipt *types.Receipt if args.Get(0) != nil { diff --git a/core/tx.go b/core/tx.go index 9269a69460..dfd494bee8 100644 --- a/core/tx.go +++ b/core/tx.go @@ -62,9 +62,12 @@ type Transactor interface { // The indices of the operators within each quorum are also returned. GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakes, error) + // BuildConfirmBatchTxn builds a transaction to confirm a batch header and signature aggregation. + BuildConfirmBatchTxn(ctx context.Context, batchHeader *BatchHeader, quorums map[QuorumID]*QuorumResult, signatureAggregation *SignatureAggregation) (*types.Transaction, error) + // ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds // specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail. - ConfirmBatch(ctx context.Context, batchHeader BatchHeader, quorums map[QuorumID]*QuorumResult, signatureAggregation SignatureAggregation) (*types.Receipt, error) + ConfirmBatch(ctx context.Context, batchHeader *BatchHeader, quorums map[QuorumID]*QuorumResult, signatureAggregation *SignatureAggregation) (*types.Receipt, error) // GetBlockStaleMeasure returns the BLOCK_STALE_MEASURE defined onchain. GetBlockStaleMeasure(ctx context.Context) (uint32, error) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 958973c464..f6ae4cc598 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "math/big" "time" "github.com/Layr-Labs/eigenda/common" @@ -63,13 +64,14 @@ type Batcher struct { Queue disperser.BlobStore Dispatcher disperser.Dispatcher - Confirmer disperser.BatchConfirmer EncoderClient disperser.EncoderClient ChainState core.IndexedChainState AssignmentCoordinator core.AssignmentCoordinator Aggregator core.SignatureAggregator EncodingStreamer *EncodingStreamer + Transactor core.Transactor + TransactionManager TxnManager Metrics *Metrics ethClient common.EthClient @@ -82,13 +84,14 @@ func NewBatcher( timeoutConfig TimeoutConfig, queue disperser.BlobStore, dispatcher disperser.Dispatcher, - confirmer disperser.BatchConfirmer, chainState core.IndexedChainState, assignmentCoordinator core.AssignmentCoordinator, encoderClient disperser.EncoderClient, aggregator core.SignatureAggregator, ethClient common.EthClient, finalizer Finalizer, + transactor core.Transactor, + txnManager TxnManager, logger common.Logger, metrics *Metrics, ) (*Batcher, error) { @@ -114,13 +117,14 @@ func NewBatcher( Queue: queue, Dispatcher: dispatcher, - Confirmer: confirmer, EncoderClient: encoderClient, ChainState: chainState, AssignmentCoordinator: assignmentCoordinator, Aggregator: aggregator, EncodingStreamer: encodingStreamer, + Transactor: transactor, + TransactionManager: txnManager, Metrics: metrics, ethClient: ethClient, @@ -142,6 +146,24 @@ func (b *Batcher) Start(ctx context.Context) error { return err } batchTrigger := b.EncodingStreamer.EncodedSizeNotifier + + go func() { + receiptChan := b.TransactionManager.ReceiptChan() + for { + select { + case <-ctx.Done(): + return + case receiptOrErr := <-receiptChan: + b.logger.Info("received response from transaction manager", "receipt", receiptOrErr.Receipt, "err", receiptOrErr.Err) + err := b.ProcessConfirmedBatch(ctx, receiptOrErr) + if err != nil { + b.logger.Error("failed to process confirmed batch", "err", err) + } + } + } + }() + b.TransactionManager.Start(ctx) + b.finalizer.Start(ctx) go func() { @@ -177,6 +199,136 @@ func (b *Batcher) Start(ctx context.Context) error { return nil } +// updateConfirmationInfo updates the confirmation info for each blob in the batch and returns failed blobs to retry. +func (b *Batcher) updateConfirmationInfo(ctx context.Context, batch *batch, aggSig *core.SignatureAggregation, txnReceipt *types.Receipt) ([]*disperser.BlobMetadata, error) { + if txnReceipt.BlockNumber == nil { + return nil, fmt.Errorf("HandleSingleBatch: error getting transaction receipt block number") + } + headerHash, err := batch.BatchHeader.GetBatchHeaderHash() + if err != nil { + return nil, fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err) + } + batchID, err := b.getBatchID(ctx, txnReceipt) + if err != nil { + return nil, fmt.Errorf("HandleSingleBatch: error fetching batch ID: %w", err) + } + + blobsToRetry := make([]*disperser.BlobMetadata, 0) + var updateConfirmationInfoErr error + + for blobIndex, metadata := range batch.BlobMetadata { + // Mark the blob failed if it didn't get enough signatures. + status := disperser.InsufficientSignatures + + var proof []byte + if isBlobAttested(aggSig.QuorumResults, batch.BlobHeaders[blobIndex]) { + status = disperser.Confirmed + // generate inclusion proof + if blobIndex >= len(batch.BlobHeaders) { + b.logger.Error("HandleSingleBatch: error confirming blobs: blob header not found in batch", "index", blobIndex) + blobsToRetry = append(blobsToRetry, batch.BlobMetadata[blobIndex]) + continue + } + blobHeader := batch.BlobHeaders[blobIndex] + + blobHeaderHash, err := blobHeader.GetBlobHeaderHash() + if err != nil { + b.logger.Error("HandleSingleBatch: failed to get blob header hash", "err", err) + blobsToRetry = append(blobsToRetry, batch.BlobMetadata[blobIndex]) + continue + } + merkleProof, err := batch.MerkleTree.GenerateProof(blobHeaderHash[:], 0) + if err != nil { + b.logger.Error("HandleSingleBatch: failed to generate blob header inclusion proof", "err", err) + blobsToRetry = append(blobsToRetry, batch.BlobMetadata[blobIndex]) + continue + } + proof = serializeProof(merkleProof) + } + + confirmationInfo := &disperser.ConfirmationInfo{ + BatchHeaderHash: headerHash, + BlobIndex: uint32(blobIndex), + SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batch.BatchHeader.ReferenceBlockNumber), aggSig.NonSigners), + ReferenceBlockNumber: uint32(batch.BatchHeader.ReferenceBlockNumber), + BatchRoot: batch.BatchHeader.BatchRoot[:], + BlobInclusionProof: proof, + BlobCommitment: &batch.BlobHeaders[blobIndex].BlobCommitments, + BatchID: uint32(batchID), + ConfirmationTxnHash: txnReceipt.TxHash, + ConfirmationBlockNumber: uint32(txnReceipt.BlockNumber.Uint64()), + Fee: []byte{0}, // No fee + QuorumResults: aggSig.QuorumResults, + BlobQuorumInfos: batch.BlobHeaders[blobIndex].QuorumInfos, + } + + if status == disperser.Confirmed { + if _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed) + // remove encoded blob from storage so we don't disperse it again + b.EncodingStreamer.RemoveEncodedBlob(metadata) + } + } else if status == disperser.InsufficientSignatures { + if _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) + // remove encoded blob from storage so we don't disperse it again + b.EncodingStreamer.RemoveEncodedBlob(metadata) + } + } else { + updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String()) + } + if updateConfirmationInfoErr != nil { + b.logger.Error("HandleSingleBatch: error updating blob confirmed metadata", "err", updateConfirmationInfoErr) + blobsToRetry = append(blobsToRetry, batch.BlobMetadata[blobIndex]) + } + requestTime := time.Unix(0, int64(metadata.RequestMetadata.RequestedAt)) + b.Metrics.ObserveLatency("E2E", float64(time.Since(requestTime).Milliseconds())) + } + + return blobsToRetry, nil +} + +func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *ReceiptOrErr) error { + if receiptOrErr.Metadata == nil { + return fmt.Errorf("failed to process confirmed batch: no metadata from transaction manager response") + } + confirmationMetadata := receiptOrErr.Metadata.(confirmationMetadata) + batch := confirmationMetadata.batch + if batch == nil { + return fmt.Errorf("failed to process confirmed batch: batch from transaction manager metadata is nil") + } + if receiptOrErr.Err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) + return fmt.Errorf("failed to confirm batch onchain: %w", receiptOrErr.Err) + } + if confirmationMetadata.aggSig == nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailNoAggregatedSignature) + return fmt.Errorf("failed to process confirmed batch: aggSig from transaction manager metadata is nil") + } + b.logger.Info("received ConfirmBatch transaction receipt", "blockNumber", receiptOrErr.Receipt.BlockNumber, "txnHash", receiptOrErr.Receipt.TxHash.Hex()) + + // Mark the blobs as complete + stageTimer := time.Now() + blobsToRetry, err := b.updateConfirmationInfo(ctx, batch, confirmationMetadata.aggSig, receiptOrErr.Receipt) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailUpdateConfirmationInfo) + return fmt.Errorf("failed to update confirmation info: %w", err) + } + if len(blobsToRetry) > 0 { + b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(batch.BlobMetadata)) + _ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo) + } + b.logger.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer)) + b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds())) + batchSize := int64(0) + for _, blobMeta := range batch.BlobMetadata { + batchSize += int64(blobMeta.RequestMetadata.BlobSize) + } + b.Metrics.IncrementBatchCount(batchSize) + + return nil +} + func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { var result *multierror.Error for _, metadata := range blobMetadatas { @@ -193,6 +345,12 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser. // Return the error(s) return result.ErrorOrNil() } + +type confirmationMetadata struct { + batch *batch + aggSig *core.SignatureAggregation +} + func (b *Batcher) HandleSingleBatch(ctx context.Context) error { log := b.logger // start a timer @@ -230,7 +388,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { for quorumID := range batch.State.Operators { quorumIDs = append(quorumIDs, quorumID) } - fmt.Println("quorumIDs", quorumIDs) stageTimer = time.Now() aggSig, err := b.Aggregator.AggregateSignatures(ctx, batch.State, quorumIDs, headerHash, update) @@ -242,7 +399,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { b.Metrics.ObserveLatency("AggregateSignatures", float64(time.Since(stageTimer).Milliseconds())) b.Metrics.UpdateAttestation(len(batch.State.IndexedOperators), len(aggSig.NonSigners)) - passed, numPassed := getBlobQuorumPassStatus(aggSig.QuorumResults, batch.BlobHeaders) + numPassed := numBlobsAttested(aggSig.QuorumResults, batch.BlobHeaders) // TODO(mooselumph): Determine whether to confirm the batch based on the number of successes if numPassed == 0 { _ = b.handleFailure(ctx, batch.BlobMetadata, FailNoSignatures) @@ -251,108 +408,21 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { // Confirm the batch log.Trace("[batcher] Confirming batch...") - stageTimer = time.Now() - txnReceipt, err := b.Confirmer.ConfirmBatch(ctx, batch.BatchHeader, aggSig.QuorumResults, aggSig) + + txn, err := b.Transactor.BuildConfirmBatchTxn(ctx, batch.BatchHeader, aggSig.QuorumResults, aggSig) if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) - return fmt.Errorf("HandleSingleBatch: error confirming batch: %w", err) + return fmt.Errorf("HandleSingleBatch: error building confirmBatch transaction: %w", err) } - log.Trace("[batcher] ConfirmBatch took", "duration", time.Since(stageTimer)) - log.Info("[batcher] Batch confirmed at block", "blockNumber", txnReceipt.BlockNumber, "txnHash", txnReceipt.TxHash.Hex()) - b.Metrics.ObserveLatency("ConfirmBatch", float64(time.Since(stageTimer).Milliseconds())) - b.Metrics.GasUsed.Set(float64(txnReceipt.GasUsed)) - - batchID, err := b.getBatchID(ctx, txnReceipt) + err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{ + batch: batch, + aggSig: aggSig, + })) if err != nil { - _ = b.handleFailure(ctx, batch.BlobMetadata, FailGetBatchID) - return fmt.Errorf("HandleSingleBatch: error fetching batch ID: %w", err) - } - - // Mark the blobs as complete - log.Trace("[batcher] Marking blobs as complete...") - stageTimer = time.Now() - blobsToRetry := make([]*disperser.BlobMetadata, 0) - var updateConfirmationInfoErr error - for blobIndex, metadata := range batch.BlobMetadata { - // Mark the blob failed if it didn't get enough signatures. - status := disperser.Confirmed - if !passed[blobIndex] { - status = disperser.InsufficientSignatures - } - - var blobHeader *core.BlobHeader - var proof []byte - if status == disperser.Confirmed { - // generate inclusion proof - if blobIndex >= len(batch.BlobHeaders) { - return fmt.Errorf("HandleSingleBatch: error confirming blobs: blob header at index %d not found in batch", blobIndex) - } - blobHeader = batch.BlobHeaders[blobIndex] - - blobHeaderHash, err := blobHeader.GetBlobHeaderHash() - if err != nil { - return fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err) - } - merkleProof, err := batch.MerkleTree.GenerateProof(blobHeaderHash[:], 0) - if err != nil { - return fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err) - } - proof = serializeProof(merkleProof) - } - - confirmationInfo := &disperser.ConfirmationInfo{ - BatchHeaderHash: headerHash, - BlobIndex: uint32(blobIndex), - SignatoryRecordHash: core.ComputeSignatoryRecordHash(uint32(batch.BatchHeader.ReferenceBlockNumber), aggSig.NonSigners), - ReferenceBlockNumber: uint32(batch.BatchHeader.ReferenceBlockNumber), - BatchRoot: batch.BatchHeader.BatchRoot[:], - BlobInclusionProof: proof, - BlobCommitment: &batch.BlobHeaders[blobIndex].BlobCommitments, - BatchID: uint32(batchID), - ConfirmationTxnHash: txnReceipt.TxHash, - ConfirmationBlockNumber: uint32(txnReceipt.BlockNumber.Uint64()), - Fee: []byte{0}, // No fee - QuorumResults: aggSig.QuorumResults, - BlobQuorumInfos: batch.BlobHeaders[blobIndex].QuorumInfos, - } - - if status == disperser.Confirmed { - if _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { - b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed) - // remove encoded blob from storage so we don't disperse it again - b.EncodingStreamer.RemoveEncodedBlob(metadata) - } - } else if status == disperser.InsufficientSignatures { - if _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { - b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) - // remove encoded blob from storage so we don't disperse it again - b.EncodingStreamer.RemoveEncodedBlob(metadata) - } - } else { - updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String()) - } - if updateConfirmationInfoErr != nil { - log.Error("HandleSingleBatch: error updating blob confirmed metadata", "err", updateConfirmationInfoErr) - blobsToRetry = append(blobsToRetry, batch.BlobMetadata[blobIndex]) - } - requestTime := time.Unix(0, int64(metadata.RequestMetadata.RequestedAt)) - b.Metrics.ObserveLatency("E2E", float64(time.Since(requestTime).Milliseconds())) - } - - if len(blobsToRetry) > 0 { - _ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo) - if len(blobsToRetry) == len(batch.BlobMetadata) { - return fmt.Errorf("HandleSingleBatch: failed to update blob confirmed metadata for all blobs in batch: %w", updateConfirmationInfoErr) - } + _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) + return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err) } - log.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer)) - b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds())) - batchSize := int64(0) - for _, blobMeta := range batch.BlobMetadata { - batchSize += int64(blobMeta.RequestMetadata.BlobSize) - } - b.Metrics.IncrementBatchCount(batchSize) return nil } @@ -440,12 +510,10 @@ func (b *Batcher) getBatchID(ctx context.Context, txReceipt *types.Receipt) (uin return batchID, nil } -// Determine failure status for each blob based on stake signed per quorum. We fail a blob if it received -// insufficient signatures for any quorum -func getBlobQuorumPassStatus(signedQuorums map[core.QuorumID]*core.QuorumResult, headers []*core.BlobHeader) ([]bool, int) { +// numBlobsAttested returns the number of blobs that have been successfully attested by the given quorums +func numBlobsAttested(signedQuorums map[core.QuorumID]*core.QuorumResult, headers []*core.BlobHeader) int { numPassed := 0 - passed := make([]bool, len(headers)) - for ind, blob := range headers { + for _, blob := range headers { thisPassed := true for _, quorum := range blob.QuorumInfos { if signedQuorums[quorum.QuorumID].PercentSigned < quorum.QuorumThreshold { @@ -453,11 +521,19 @@ func getBlobQuorumPassStatus(signedQuorums map[core.QuorumID]*core.QuorumResult, break } } - passed[ind] = thisPassed if thisPassed { numPassed++ } } - return passed, numPassed + return numPassed +} + +func isBlobAttested(signedQuorums map[core.QuorumID]*core.QuorumResult, header *core.BlobHeader) bool { + for _, quorum := range header.QuorumInfos { + if signedQuorums[quorum.QuorumID].PercentSigned < quorum.QuorumThreshold { + return false + } + } + return true } diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index b888a6f3cd..e38e436fec 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -17,6 +17,7 @@ import ( coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" bat "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/mock" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" @@ -24,7 +25,6 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) var ( @@ -32,7 +32,8 @@ var ( ) type batcherComponents struct { - confirmer *dmock.MockBatchConfirmer + transactor *coremock.MockTransactor + txnManager *mock.MockTxnManager blobStore disperser.BlobStore encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer @@ -83,7 +84,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) { // Disperser Components dispatcher := dmock.NewDispatcher(state) - confirmer := dmock.NewBatchConfirmer() blobStore := inmem.NewBlobStore() pullInterval := 100 * time.Millisecond @@ -107,13 +107,15 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) { encoderClient := disperser.NewLocalEncoderClient(enc) finalizer := batchermock.NewFinalizer() ethClient := &cmock.MockEthClient{} + txnManager := mock.NewTxnManager() - b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, confirmer, cst, asgn, encoderClient, agg, ethClient, finalizer, logger, metrics) + b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics) assert.NoError(t, err) // Make the batcher return &batcherComponents{ - confirmer: confirmer, + transactor: transactor, + txnManager: txnManager, blobStore: blobStore, encoderClient: encoderClient, encodingStreamer: b.EncodingStreamer, @@ -145,6 +147,8 @@ func TestBatcherIterations(t *testing.T) { logData, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000") assert.NoError(t, err) + txHash := gethcommon.HexToHash("0x1234") + blockNumber := big.NewInt(123) receipt := &types.Receipt{ Logs: []*types.Log{ { @@ -152,9 +156,9 @@ func TestBatcherIterations(t *testing.T) { Data: logData, }, }, - BlockNumber: big.NewInt(123), + BlockNumber: blockNumber, + TxHash: txHash, } - components.confirmer.On("ConfirmBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(receipt, nil) blobStore := components.blobStore ctx := context.Background() requestedAt1, blobKey1 := queueBlob(t, ctx, &blob1, blobStore) @@ -172,8 +176,19 @@ func TestBatcherIterations(t *testing.T) { assert.Equal(t, 2, count) assert.Equal(t, uint64(197632), size) + txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) + components.txnManager.On("ProcessTransaction").Return(nil) + err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) + assert.Greater(t, len(components.txnManager.Requests), 0) + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: receipt, + Err: nil, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.NoError(t, err) // Check that the blob was processed meta1, err := blobStore.GetBlobMetadata(ctx, blobKey1) assert.NoError(t, err) @@ -181,6 +196,8 @@ func TestBatcherIterations(t *testing.T) { assert.Equal(t, requestedAt1, meta1.RequestMetadata.RequestedAt) assert.Equal(t, disperser.Confirmed, meta1.BlobStatus) assert.Equal(t, meta1.ConfirmationInfo.BatchID, uint32(3)) + assert.Equal(t, meta1.ConfirmationInfo.ConfirmationTxnHash, txHash) + assert.Equal(t, meta1.ConfirmationInfo.ConfirmationBlockNumber, uint32(blockNumber.Int64())) meta2, err := blobStore.GetBlobMetadata(ctx, blobKey2) assert.NoError(t, err) @@ -207,7 +224,6 @@ func TestBlobFailures(t *testing.T) { components, batcher := makeBatcher(t) confirmationErr := fmt.Errorf("error") - components.confirmer.On("ConfirmBatch").Return(nil, confirmationErr) blobStore := components.blobStore ctx := context.Background() requestedAt, blobKey := queueBlob(t, ctx, &blob, blobStore) @@ -219,8 +235,21 @@ func TestBlobFailures(t *testing.T) { err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) assert.NoError(t, err) + txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) + components.txnManager.On("ProcessTransaction").Return(nil) + + // Test with receipt response with error err = batcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.Greater(t, len(components.txnManager.Requests), 0) + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: nil, + Err: confirmationErr, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) assert.ErrorIs(t, err, confirmationErr) + meta, err := blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) assert.Equal(t, blobKey, meta.GetBlobKey()) @@ -232,9 +261,19 @@ func TestBlobFailures(t *testing.T) { assert.NoError(t, err) assert.Len(t, metadatas, 1) + // Test with receipt response with no block number components.encodingStreamer.ReferenceBlockNumber = 10 err = batcher.HandleSingleBatch(ctx) - assert.ErrorIs(t, err, confirmationErr) + assert.NoError(t, err) + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: &types.Receipt{ + TxHash: gethcommon.HexToHash("0x1234"), + }, + Err: nil, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.ErrorContains(t, err, "error getting transaction receipt block number") + meta, err = blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) @@ -242,9 +281,20 @@ func TestBlobFailures(t *testing.T) { assert.Equal(t, disperser.Processing, meta.BlobStatus) assert.Equal(t, uint(2), meta.NumRetries) + // Try again components.encodingStreamer.ReferenceBlockNumber = 10 err = batcher.HandleSingleBatch(ctx) - assert.ErrorIs(t, err, confirmationErr) + assert.NoError(t, err) + + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: &types.Receipt{ + TxHash: gethcommon.HexToHash("0x1234"), + }, + Err: nil, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.ErrorContains(t, err, "error getting transaction receipt block number") + meta, err = blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) @@ -284,7 +334,6 @@ func TestRetryTxnReceipt(t *testing.T) { BlockNumber: big.NewInt(123), } - components.confirmer.On("ConfirmBatch").Return(invalidReceipt, nil) components.ethClient.On("TransactionReceipt").Return(invalidReceipt, nil).Twice() components.ethClient.On("TransactionReceipt").Return(validReceipt, nil).Once() blobStore := components.blobStore @@ -298,8 +347,18 @@ func TestRetryTxnReceipt(t *testing.T) { err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) assert.NoError(t, err) + txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) + components.txnManager.On("ProcessTransaction").Return(nil) + err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: invalidReceipt, + Err: nil, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.NoError(t, err) // Check that the blob was processed meta, err := blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) diff --git a/disperser/batcher/eth/confirmer.go b/disperser/batcher/eth/confirmer.go index eff207e198..a7ce5030d7 100644 --- a/disperser/batcher/eth/confirmer.go +++ b/disperser/batcher/eth/confirmer.go @@ -44,7 +44,7 @@ func (c *BatchConfirmer) ConfirmBatch(ctx context.Context, header *core.BatchHea ctxWithTimeout, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() for i := 0; i < maxRetries; i++ { - txReceipt, err = c.Transactor.ConfirmBatch(ctxWithTimeout, *header, quorums, *sigAgg) + txReceipt, err = c.Transactor.ConfirmBatch(ctxWithTimeout, header, quorums, sigAgg) if err == nil { break } diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 8c0b266c16..415dddc315 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -22,6 +22,7 @@ const ( FailConfirmBatch FailReason = "confirm_batch" FailGetBatchID FailReason = "get_batch_id" FailUpdateConfirmationInfo FailReason = "update_confirmation_info" + FailNoAggregatedSignature FailReason = "no_aggregated_signature" ) type MetricsConfig struct { diff --git a/disperser/batcher/mock/txn_manager.go b/disperser/batcher/mock/txn_manager.go new file mode 100644 index 0000000000..cec8cab703 --- /dev/null +++ b/disperser/batcher/mock/txn_manager.go @@ -0,0 +1,33 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/stretchr/testify/mock" +) + +type MockTxnManager struct { + mock.Mock + + Requests []*batcher.TxnRequest +} + +var _ batcher.TxnManager = (*MockTxnManager)(nil) + +func NewTxnManager() *MockTxnManager { + return &MockTxnManager{} +} + +func (b *MockTxnManager) Start(ctx context.Context) {} + +func (b *MockTxnManager) ProcessTransaction(ctx context.Context, req *batcher.TxnRequest) error { + args := b.Called() + b.Requests = append(b.Requests, req) + return args.Error(0) +} + +func (b *MockTxnManager) ReceiptChan() chan *batcher.ReceiptOrErr { + args := b.Called() + return args.Get(0).(chan *batcher.ReceiptOrErr) +} diff --git a/disperser/batcher/txn_manager.go b/disperser/batcher/txn_manager.go index d3361e010f..ee03c4b279 100644 --- a/disperser/batcher/txn_manager.go +++ b/disperser/batcher/txn_manager.go @@ -19,48 +19,71 @@ var ( hundred = big.NewInt(100) ) +// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status. +// It also handles the case where a transaction is not mined within a certain time. In this case, it will +// resend the transaction with a higher gas price. It is assumed that all transactions originate from the +// same account. +type TxnManager interface { + Start(ctx context.Context) + ProcessTransaction(ctx context.Context, req *TxnRequest) error + ReceiptChan() chan *ReceiptOrErr +} + type TxnRequest struct { - Tx *types.Transaction - Tag string - Value *big.Int + Tx *types.Transaction + Tag string + Value *big.Int + Metadata interface{} + + requestedAt time.Time } // ReceiptOrErr is a wrapper for a transaction receipt or an error. // Receipt should be nil if there is an error, and non-nil if there is no error. +// Metadata is the metadata passed in with the transaction request. type ReceiptOrErr struct { - Receipt *types.Receipt - Err error + Receipt *types.Receipt + Metadata interface{} + Err error } -// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status. -// It also handles the case where a transaction is not mined within a certain time. In this case, it will -// resend the transaction with a higher gas price. It is assumed that all transactions originate from the -// same account. -type TxnManager struct { +type txnManager struct { mu sync.Mutex - ReceiptChan chan *ReceiptOrErr - ethClient common.EthClient requestChan chan *TxnRequest logger common.Logger + receiptChan chan *ReceiptOrErr queueSize int txnRefreshInterval time.Duration } -func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) *TxnManager { - return &TxnManager{ - ReceiptChan: make(chan *ReceiptOrErr, queueSize), +var _ TxnManager = (*txnManager)(nil) + +func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) TxnManager { + return &txnManager{ ethClient: ethClient, requestChan: make(chan *TxnRequest, queueSize), logger: logger, + receiptChan: make(chan *ReceiptOrErr, queueSize), queueSize: queueSize, txnRefreshInterval: txnRefreshInterval, } } -func (t *TxnManager) Start(ctx context.Context) { +func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata interface{}) *TxnRequest { + return &TxnRequest{ + Tx: tx, + Tag: tag, + Value: value, + Metadata: metadata, + + requestedAt: time.Now(), + } +} + +func (t *txnManager) Start(ctx context.Context) { go func() { for { select { @@ -69,14 +92,16 @@ func (t *TxnManager) Start(ctx context.Context) { case req := <-t.requestChan: receipt, err := t.monitorTransaction(ctx, req) if err != nil { - t.ReceiptChan <- &ReceiptOrErr{ - Receipt: nil, - Err: err, + t.receiptChan <- &ReceiptOrErr{ + Receipt: nil, + Metadata: req.Metadata, + Err: err, } } else { - t.ReceiptChan <- &ReceiptOrErr{ - Receipt: receipt, - Err: nil, + t.receiptChan <- &ReceiptOrErr{ + Receipt: receipt, + Metadata: req.Metadata, + Err: nil, } } } @@ -88,7 +113,7 @@ func (t *TxnManager) Start(ctx context.Context) { // ProcessTransaction sends the transaction and queues the transaction for monitoring. // It returns an error if the transaction fails to be sent for reasons other than timeouts. // TxnManager monitors the transaction and resends it with a higher gas price if it is not mined without a timeout. -func (t *TxnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error { +func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error { t.mu.Lock() defer t.mu.Unlock() t.logger.Debug("[ProcessTransaction] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap()) @@ -105,18 +130,24 @@ func (t *TxnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er if err != nil { return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err) } + req.Tx = txn t.requestChan <- req return nil } +func (t *txnManager) ReceiptChan() chan *ReceiptOrErr { + return t.receiptChan +} + // monitorTransaction monitors the transaction and resends it with a higher gas price if it is not mined without a timeout. // It returns an error if the transaction fails to be sent for reasons other than timeouts. -func (t *TxnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) { +func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) { for { ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval) defer cancel() + t.logger.Debug("[monitorTransaction] monitoring transaction", "tag", req.Tag, "nonce", req.Tx.Nonce()) receipt, err := t.ethClient.EnsureTransactionEvaled( ctxWithTimeout, req.Tx, @@ -151,7 +182,7 @@ func (t *TxnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (* // speedUpTxn increases the gas price of the existing transaction by specified percentage. // It makes sure the new gas price is not lower than the current gas price. -func (t *TxnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag string) (*types.Transaction, error) { +func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag string) (*types.Transaction, error) { prevGasTipCap := tx.GasTipCap() prevGasFeeCap := tx.GasFeeCap() // get the gas tip cap and gas fee cap based on current network condition diff --git a/disperser/batcher/txn_manager_test.go b/disperser/batcher/txn_manager_test.go index 5083e443a4..ca948019a6 100644 --- a/disperser/batcher/txn_manager_test.go +++ b/disperser/batcher/txn_manager_test.go @@ -37,7 +37,7 @@ func TestProcessTransaction(t *testing.T) { Value: nil, }) assert.NoError(t, err) - receiptOrErr := <-txnManager.ReceiptChan + receiptOrErr := <-txnManager.ReceiptChan() assert.NoError(t, receiptOrErr.Err) assert.Equal(t, uint64(1), receiptOrErr.Receipt.BlockNumber.Uint64()) ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 1) @@ -55,7 +55,7 @@ func TestProcessTransaction(t *testing.T) { }) <-ctx.Done() assert.NoError(t, err) - receiptOrErr = <-txnManager.ReceiptChan + receiptOrErr = <-txnManager.ReceiptChan() assert.Error(t, receiptOrErr.Err, randomErr) assert.Nil(t, receiptOrErr.Receipt) ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2) diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index b1d908d911..1679d9e14b 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -19,7 +19,6 @@ import ( "github.com/Layr-Labs/eigenda/core" coreeth "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/disperser/batcher" - "github.com/Layr-Labs/eigenda/disperser/batcher/eth" dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" "github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" @@ -94,11 +93,6 @@ func RunBatcher(ctx *cli.Context) error { if err != nil { return err } - confirmer, err := eth.NewBatchConfirmer(tx, config.TimeoutConfig.ChainWriteTimeout) - if err != nil { - return err - } - blockStaleMeasure, err := tx.GetBlockStaleMeasure(context.Background()) if err != nil { return fmt.Errorf("failed to get BLOCK_STALE_MEASURE: %w", err) @@ -147,7 +141,8 @@ func RunBatcher(ctx *cli.Context) error { return err } finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger) - batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, confirmer, ics, asgn, encoderClient, agg, client, finalizer, logger, metrics) + txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger) + batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics) if err != nil { return err } diff --git a/test/integration_test.go b/test/integration_test.go index 11edaa85a6..2d75c10ec6 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -37,7 +37,6 @@ import ( "github.com/Layr-Labs/eigenda/disperser/batcher" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" - dispersermock "github.com/Layr-Labs/eigenda/disperser/mock" "github.com/Layr-Labs/eigenda/node" nodegrpc "github.com/Layr-Labs/eigenda/node/grpc" "github.com/Layr-Labs/eigenda/pkg/encoding/kzgEncoder" @@ -51,7 +50,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) var ( @@ -112,9 +110,11 @@ func mustMakeTestBlob() core.Blob { } type TestDisperser struct { - Batcher *batcher.Batcher - Server *apiserver.DispersalServer - EncoderServer *encoder.Server + batcher *batcher.Batcher + server *apiserver.DispersalServer + encoderServer *encoder.Server + transactor *coremock.MockTransactor + txnManager *batchermock.MockTxnManager } func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger common.Logger) TestDisperser { @@ -128,24 +128,6 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser agg, err := core.NewStdSignatureAggregator(logger, transactor) assert.NoError(t, err) - confirmer := dispersermock.NewBatchConfirmer() - // should be encoding 3 and 0 - logData, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000") - if err != nil { - t.Fatal(err) - } - - receipt := &types.Receipt{ - Logs: []*types.Log{ - { - Topics: []gethcommon.Hash{common.BatchConfirmedEventSigHash, gethcommon.HexToHash("1234")}, - Data: logData, - }, - }, - BlockNumber: big.NewInt(123), - } - confirmer.On("ConfirmBatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(receipt, nil) - batcherConfig := batcher.Config{ PullInterval: 5 * time.Second, NumConnections: 1, @@ -176,8 +158,9 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser disperserMetrics := disperser.NewMetrics("9100", logger) batcherMetrics := batcher.NewMetrics("9100", logger) + txnManager := batchermock.NewTxnManager() - batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, confirmer, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, logger, batcherMetrics) + batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics) if err != nil { t.Fatal(err) } @@ -201,9 +184,11 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig) return TestDisperser{ - Batcher: batcher, - Server: server, - EncoderServer: grpcEncoder, + batcher: batcher, + server: server, + encoderServer: grpcEncoder, + transactor: transactor, + txnManager: txnManager, } } @@ -369,10 +354,10 @@ func TestDispersalAndRetrieval(t *testing.T) { store := inmem.NewBlobStore() dis := mustMakeDisperser(t, cst, store, logger) go func() { - _ = dis.EncoderServer.Start() + _ = dis.encoderServer.Start() }() t.Cleanup(func() { - dis.EncoderServer.Close() + dis.encoderServer.Close() }) ops := mustMakeOperators(t, cst, logger) gethClient, _ := mustMakeRetriever(cst, logger) @@ -394,13 +379,39 @@ func TestDispersalAndRetrieval(t *testing.T) { metadataKey, err := store.StoreBlob(ctx, &blob, requestedAt) assert.NoError(t, err) out := make(chan batcher.EncodingResultOrStatus) - err = dis.Batcher.EncodingStreamer.RequestEncoding(context.Background(), out) + err = dis.batcher.EncodingStreamer.RequestEncoding(context.Background(), out) + assert.NoError(t, err) + err = dis.batcher.EncodingStreamer.ProcessEncodedBlobs(context.Background(), <-out) assert.NoError(t, err) - err = dis.Batcher.EncodingStreamer.ProcessEncodedBlobs(context.Background(), <-out) + dis.batcher.EncodingStreamer.Pool.StopWait() + + txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + dis.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) + dis.txnManager.On("ProcessTransaction").Return(nil) + + err = dis.batcher.HandleSingleBatch(ctx) assert.NoError(t, err) - dis.Batcher.EncodingStreamer.Pool.StopWait() + assert.Greater(t, len(dis.txnManager.Requests), 0) + // should be encoding 3 and 0 + logData, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000") + if err != nil { + t.Fatal(err) + } - err = dis.Batcher.HandleSingleBatch(ctx) + receipt := &types.Receipt{ + Logs: []*types.Log{ + { + Topics: []gethcommon.Hash{common.BatchConfirmedEventSigHash, gethcommon.HexToHash("1234")}, + Data: logData, + }, + }, + BlockNumber: big.NewInt(123), + } + err = dis.batcher.ProcessConfirmedBatch(ctx, &batcher.ReceiptOrErr{ + Receipt: receipt, + Err: nil, + Metadata: dis.txnManager.Requests[len(dis.txnManager.Requests)-1].Metadata, + }) assert.NoError(t, err) // Check that the blob was processed