diff --git a/packages/taiko-client/bindings/encoding/input.go b/packages/taiko-client/bindings/encoding/input.go index 9be7c456a0..0aebf3113e 100644 --- a/packages/taiko-client/bindings/encoding/input.go +++ b/packages/taiko-client/bindings/encoding/input.go @@ -294,6 +294,13 @@ var ( {Name: "TaikoData.Transition", Type: transitionComponentsType}, {Name: "TaikoData.TierProof", Type: tierProofComponentsType}, } + proveBlocksInputArgs = abi.Arguments{ + {Name: "TaikoData.BlockMetadata", Type: blockMetadataV2ComponentsType}, + {Name: "TaikoData.Transition", Type: transitionComponentsType}, + } + proveBlocksBatchProofArgs = abi.Arguments{ + {Name: "TaikoData.TierProof", Type: tierProofComponentsType}, + } ) // Contract ABIs. @@ -423,6 +430,43 @@ func EncodeProveBlockInput( return b, nil } +// EncodeProveBlocksInput performs the solidity `abi.encode` for the given TaikoL1.proveBlocks input. +func EncodeProveBlocksInput( + metas []metadata.TaikoBlockMetaData, + transitions []bindings.TaikoDataTransition, +) ([][]byte, error) { + if len(metas) != len(transitions) { + return nil, fmt.Errorf("both arrays of TaikoBlockMetaData and TaikoDataTransition must be equal in length") + } + b := make([][]byte, 0, len(metas)) + for i := range metas { + input, err := proveBlocksInputArgs.Pack( + metas[i].(*metadata.TaikoDataBlockMetadataOntake).InnerMetadata(), + transitions[i], + ) + if err != nil { + return nil, fmt.Errorf("failed to abi.encode TaikoL1.proveBlocks input item after ontake fork, %w", err) + } + + b = append(b, input) + } + + return b, nil +} + +// EncodeProveBlocksBatchProof performs the solidity `abi.encode` for the given TaikoL1.proveBlocks batchProof. +func EncodeProveBlocksBatchProof( + tierProof *bindings.TaikoDataTierProof, +) ([]byte, error) { + input, err := proveBlocksBatchProofArgs.Pack( + tierProof, + ) + if err != nil { + return nil, fmt.Errorf("failed to abi.encode TaikoL1.proveBlocks input item after ontake fork, %w", err) + } + return input, nil +} + // UnpackTxListBytes unpacks the input data of a TaikoL1.proposeBlock transaction, and returns the txList bytes. func UnpackTxListBytes(txData []byte) ([]byte, error) { method, err := TaikoL1ABI.MethodById(txData) diff --git a/packages/taiko-client/cmd/flags/prover.go b/packages/taiko-client/cmd/flags/prover.go index 0dff953769..d9c13f1ce1 100644 --- a/packages/taiko-client/cmd/flags/prover.go +++ b/packages/taiko-client/cmd/flags/prover.go @@ -195,6 +195,31 @@ var ( Category: proverCategory, EnvVars: []string{"PROVER_BLOCK_CONFIRMATIONS"}, } + // Batch proof related flag + SGXBatchSize = &cli.Uint64Flag{ + Name: "prover.sgx.batchSize", + Usage: "The default size of batch sgx proofs, when it arrives, submit a batch of proof immediately, " + + "this flag only works post Ontake fork", + Value: 1, + Category: proverCategory, + EnvVars: []string{"PROVER_SGX_BATCH_SIZE"}, + } + ZKVMBatchSize = &cli.Uint64Flag{ + Name: "prover.zkvm.batchSize", + Usage: "The size of batch ZKVM proof, when it arrives, submit a batch of proof immediately, " + + "this flag only works post Ontake fork", + Value: 1, + Category: proverCategory, + EnvVars: []string{"PROVER_ZKVM_BATCH_SIZE"}, + } + ForceProveInterval = &cli.DurationFlag{ + Name: "prover.forceBatchProvingInterval", + Usage: "Time interval to prove blocks even the number of pending proof do not exceed prover.batchSize, " + + "this flag only works post Ontake fork", + Category: proverCategory, + Value: 30 * time.Minute, + EnvVars: []string{"PROVER_FORCE_BATCH_PROVING_INTERVAL"}, + } ) // ProverFlags All prover flags. @@ -227,4 +252,7 @@ var ProverFlags = MergeFlags(CommonFlags, []cli.Flag{ BlockConfirmations, RaikoRequestTimeout, RaikoZKVMHostEndpoint, + SGXBatchSize, + ZKVMBatchSize, + ForceProveInterval, }, TxmgrFlags) diff --git a/packages/taiko-client/internal/metrics/metrics.go b/packages/taiko-client/internal/metrics/metrics.go index 6ac55f1186..27f4da0772 100644 --- a/packages/taiko-client/internal/metrics/metrics.go +++ b/packages/taiko-client/internal/metrics/metrics.go @@ -47,15 +47,27 @@ var ( ProverSubmissionErrorCounter = factory.NewCounter(prometheus.CounterOpts{ Name: "prover_proof_submission_error", }) + ProverAggregationSubmissionErrorCounter = factory.NewCounter(prometheus.CounterOpts{ + Name: "prover_proof_aggregation_submission_error", + }) ProverSgxProofGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ Name: "prover_proof_sgx_generated", }) + ProverSgxProofAggregationGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ + Name: "prover_proof_sgx_aggregation_generated", + }) ProverR0ProofGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ Name: "prover_proof_r0_generated", }) + ProverR0ProofAggregationGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ + Name: "prover_proof_r0_aggregation_generated", + }) ProverSp1ProofGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ Name: "prover_proof_sp1_generated", }) + ProverSp1ProofAggregationGeneratedCounter = factory.NewCounter(prometheus.CounterOpts{ + Name: "prover_proof_sp1_aggregation_generated", + }) ProverSubmissionRevertedCounter = factory.NewCounter(prometheus.CounterOpts{ Name: "prover_proof_submission_reverted", }) diff --git a/packages/taiko-client/pkg/rpc/ethclient.go b/packages/taiko-client/pkg/rpc/ethclient.go index 02df6da9d6..f16e43d57b 100644 --- a/packages/taiko-client/pkg/rpc/ethclient.go +++ b/packages/taiko-client/pkg/rpc/ethclient.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "errors" "math/big" "time" @@ -14,6 +15,10 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) +var ( + ErrInvalidLenOfParams = errors.New("invalid length of parameters") +) + // gethClient is a wrapper for go-ethereum geth client. type gethClient struct { *gethclient.Client @@ -74,6 +79,34 @@ func (c *EthClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.B return c.ethClient.BlockByHash(ctxWithTimeout, hash) } +func (c *EthClient) BatchBlocksByHashes(ctx context.Context, hashes []common.Hash) ([]*types.Block, error) { + if len(hashes) < 1 { + return nil, ErrInvalidLenOfParams + } + ctxWithTimeout, cancel := CtxWithTimeoutOrDefault(ctx, c.timeout) + defer cancel() + + reqs := make([]rpc.BatchElem, len(hashes)) + results := make([]*types.Block, len(hashes)) + for i, hash := range hashes { + reqs[i] = rpc.BatchElem{ + Method: "eth_getBlockByHash", + Args: []interface{}{hash, true}, + Result: &results[i], + } + } + if err := c.BatchCallContext(ctxWithTimeout, reqs); err != nil { + return nil, err + } + for i := range reqs { + if reqs[i].Error != nil { + return nil, reqs[i].Error + } + } + + return results, nil +} + // BlockByNumber returns a block from the current canonical chain. If number is nil, the // latest known block is returned. // @@ -119,6 +152,34 @@ func (c *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types return c.ethClient.HeaderByNumber(ctxWithTimeout, number) } +func (c *EthClient) BatchHeadersByNumbers(ctx context.Context, numbers []*big.Int) ([]*types.Header, error) { + if len(numbers) < 1 { + return nil, ErrInvalidLenOfParams + } + ctxWithTimeout, cancel := CtxWithTimeoutOrDefault(ctx, c.timeout) + defer cancel() + + reqs := make([]rpc.BatchElem, len(numbers)) + results := make([]*types.Header, len(numbers)) + for i, blockNum := range numbers { + reqs[i] = rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{blockNum, false}, + Result: &results[i], + } + } + if err := c.BatchCallContext(ctxWithTimeout, reqs); err != nil { + return nil, err + } + for i := range reqs { + if reqs[i].Error != nil { + return nil, reqs[i].Error + } + } + + return results, nil +} + // TransactionByHash returns the transaction with the given hash. func (c *EthClient) TransactionByHash( ctx context.Context, diff --git a/packages/taiko-client/pkg/rpc/ethclient_test.go b/packages/taiko-client/pkg/rpc/ethclient_test.go index 3739482b63..608b2e9773 100644 --- a/packages/taiko-client/pkg/rpc/ethclient_test.go +++ b/packages/taiko-client/pkg/rpc/ethclient_test.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "math/big" "testing" "github.com/ethereum/go-ethereum" @@ -168,3 +169,28 @@ func TestEstimateGas(t *testing.T) { _, err := client.L1.EstimateGas(context.Background(), ethereum.CallMsg{}) require.Nil(t, err) } + +func TestBatchBlocksByNumbers(t *testing.T) { + client := newTestClientWithTimeout(t) + + headers, err := client.L1.BatchHeadersByNumbers(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1)}) + require.Nil(t, err) + require.Len(t, headers, 2) +} + +func TestBatchBlocksByHashes(t *testing.T) { + client := newTestClientWithTimeout(t) + + headers, err := client.L1.BatchHeadersByNumbers(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1)}) + require.Nil(t, err) + require.Len(t, headers, 2) + + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + + blocks, err := client.L1.BatchBlocksByHashes(context.Background(), hashes) + require.Nil(t, err) + require.Len(t, blocks, 2) +} diff --git a/packages/taiko-client/pkg/rpc/utils.go b/packages/taiko-client/pkg/rpc/utils.go index 85dac75687..570d72da16 100644 --- a/packages/taiko-client/pkg/rpc/utils.go +++ b/packages/taiko-client/pkg/rpc/utils.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "golang.org/x/sync/errgroup" "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings" "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/encoding" @@ -62,7 +63,6 @@ func GetProtocolStateVariables( } opts.Context, cancel = CtxWithTimeoutOrDefault(opts.Context, defaultTimeout) defer cancel() - // Notice: sloB.LastProposedIn and slotB.LastUnpausedAt are always 0 // before upgrading contract, but we can ignore it since we won't use it. slotA, slotB, err := taikoL1Client.GetStateVariables(opts) @@ -248,6 +248,145 @@ func GetBlockProofStatus( }, nil } +// BatchGetBlocksProofStatus checks whether the batch of L2 blocks still need new proofs or new contests. +// Here are the possible status: +// 1. No proof on chain at all. +// 2. A valid proof has been submitted. +// 3. An invalid proof has been submitted, and there is no valid contest. +// 4. An invalid proof has been submitted, and there is a valid contest. +func BatchGetBlocksProofStatus( + ctx context.Context, + cli *Client, + ids []*big.Int, + proverAddress common.Address, + proverSetAddress common.Address, +) ([]*BlockProofStatus, error) { + ctxWithTimeout, cancel := CtxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + var ( + parentHashes = make([][32]byte, len(ids)) + parents = make([]*types.Header, len(ids)) + blockIDs = make([]uint64, len(ids)) + result = make([]*BlockProofStatus, len(ids)) + highestBlockID = big.NewInt(0) + ) + // Get the local L2 parent header. + g, gCtx := errgroup.WithContext(ctxWithTimeout) + for i, id := range ids { + g.Go(func() error { + parent, err := cli.L2.HeaderByNumber(gCtx, new(big.Int).Sub(id, common.Big1)) + if err != nil { + return err + } + parentHashes[i] = parent.Hash() + parents[i] = parent + blockIDs[i] = id.Uint64() + if id.Cmp(highestBlockID) > 0 { + highestBlockID = id + } + return nil + }) + } + if gErr := g.Wait(); gErr != nil { + return nil, gErr + } + + // Get the transition state from TaikoL1 contract. + transitions, err := cli.TaikoL1.GetTransitions( + &bind.CallOpts{Context: ctxWithTimeout}, + blockIDs, + parentHashes, + ) + if err != nil { + return nil, err + } + highestHeader, err := cli.WaitL2Header(ctxWithTimeout, highestBlockID) + if err != nil { + return nil, err + } + g, gCtx = errgroup.WithContext(ctxWithTimeout) + for i, transition := range transitions { + // No proof on chain + if transition.BlockHash == (common.Hash{}) { + result[i] = &BlockProofStatus{IsSubmitted: false, ParentHeader: parents[i]} + continue + } + g.Go(func() error { + if err != nil { + return err + } + var ( + localBlockHash common.Hash + localStateRoot [32]byte + ) + if i+1 < len(parents) { + localBlockHash = parents[i+1].Hash() + localStateRoot = parents[i+1].Root + } else { + localBlockHash = highestHeader.Hash() + localStateRoot = highestHeader.Root + } + + if localBlockHash != transition.BlockHash || + (transition.StateRoot != (common.Hash{}) && transition.StateRoot != localStateRoot) { + log.Info( + "Different block hash or state root detected, try submitting a contest", + "localBlockHash", localBlockHash, + "protocolTransitionBlockHash", common.BytesToHash(transition.BlockHash[:]), + "localStateRoot", localStateRoot, + "protocolTransitionStateRoot", common.BytesToHash(transition.StateRoot[:]), + ) + result[i] = &BlockProofStatus{ + IsSubmitted: true, + Invalid: true, + CurrentTransitionState: &transitions[i], + ParentHeader: parents[i], + } + return nil + } + + if proverAddress == transition.Prover || + (proverSetAddress != ZeroAddress && transition.Prover == proverSetAddress) { + log.Info( + "📬 Block's proof has already been submitted by current prover", + "blockID", ids[i], + "parent", parents[i].Hash().Hex(), + "hash", common.Bytes2Hex(transition.BlockHash[:]), + "stateRoot", common.Bytes2Hex(transition.StateRoot[:]), + "timestamp", transition.Timestamp, + "contester", transition.Contester, + ) + result[i] = &BlockProofStatus{ + IsSubmitted: true, + Invalid: transition.Contester != ZeroAddress, + ParentHeader: parents[i], + CurrentTransitionState: &transitions[i], + } + return nil + } + log.Info( + "📬 Block's proof has already been submitted by another prover", + "blockID", ids[i], + "prover", transition.Prover, + "parent", parents[i].Hash().Hex(), + "hash", common.Bytes2Hex(transition.BlockHash[:]), + "stateRoot", common.Bytes2Hex(transition.StateRoot[:]), + "timestamp", transition.Timestamp, + "contester", transition.Contester, + ) + + result[i] = &BlockProofStatus{ + IsSubmitted: true, + Invalid: transition.Contester != ZeroAddress, + ParentHeader: parents[i], + CurrentTransitionState: &transitions[i], + } + return nil + }) + } + return result, g.Wait() +} + // SetHead makes a `debug_setHead` RPC call to set the chain's head, should only be used // for testing purpose. func SetHead(ctx context.Context, client *EthClient, headNum *big.Int) error { diff --git a/packages/taiko-client/prover/config.go b/packages/taiko-client/prover/config.go index 2fe3cf7b95..552ecfac23 100644 --- a/packages/taiko-client/prover/config.go +++ b/packages/taiko-client/prover/config.go @@ -60,6 +60,9 @@ type Config struct { BlockConfirmations uint64 TxmgrConfigs *txmgr.CLIConfig PrivateTxmgrConfigs *txmgr.CLIConfig + SGXProofBufferSize uint64 + ZKVMProofBufferSize uint64 + ForceProveInterval time.Duration } // NewConfigFromCliContext creates a new config instance from command line flags. @@ -183,5 +186,8 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { l1ProverPrivKey, c, ), + SGXProofBufferSize: c.Uint64(flags.SGXBatchSize.Name), + ZKVMProofBufferSize: c.Uint64(flags.ZKVMBatchSize.Name), + ForceProveInterval: c.Duration(flags.ForceProveInterval.Name), }, nil } diff --git a/packages/taiko-client/prover/init.go b/packages/taiko-client/prover/init.go index cf640ceca4..dc2e4cb40e 100644 --- a/packages/taiko-client/prover/init.go +++ b/packages/taiko-client/prover/init.go @@ -98,9 +98,10 @@ func (p *Prover) initProofSubmitters( ) error { for _, tier := range p.sharedState.GetTiers() { var ( - producer proofProducer.ProofProducer - submitter proofSubmitter.Submitter - err error + bufferSize = p.cfg.SGXProofBufferSize + producer proofProducer.ProofProducer + submitter proofSubmitter.Submitter + err error ) switch tier.ID { case encoding.TierOptimisticID: @@ -121,6 +122,7 @@ func (p *Prover) initProofSubmitters( Dummy: p.cfg.Dummy, RaikoRequestTimeout: p.cfg.RaikoRequestTimeout, } + bufferSize = p.cfg.ZKVMProofBufferSize case encoding.TierZkVMSp1ID: producer = &proofProducer.ZKvmProofProducer{ ZKProofType: proofProducer.ZKProofTypeSP1, @@ -129,10 +131,13 @@ func (p *Prover) initProofSubmitters( Dummy: p.cfg.Dummy, RaikoRequestTimeout: p.cfg.RaikoRequestTimeout, } + bufferSize = p.cfg.ZKVMProofBufferSize case encoding.TierGuardianMinorityID: producer = proofProducer.NewGuardianProofProducer(encoding.TierGuardianMinorityID, p.cfg.EnableLivenessBondProof) + bufferSize = 0 case encoding.TierGuardianMajorityID: producer = proofProducer.NewGuardianProofProducer(encoding.TierGuardianMajorityID, p.cfg.EnableLivenessBondProof) + bufferSize = 0 default: return fmt.Errorf("unsupported tier: %d", tier.ID) } @@ -141,6 +146,8 @@ func (p *Prover) initProofSubmitters( p.rpc, producer, p.proofGenerationCh, + p.batchProofGenerationCh, + p.aggregationNotify, p.cfg.ProverSetAddress, p.cfg.TaikoL2Address, p.cfg.Graffiti, @@ -151,6 +158,7 @@ func (p *Prover) initProofSubmitters( tiers, p.IsGuardianProver(), p.cfg.GuardianProofSubmissionDelay, + bufferSize, ); err != nil { return err } diff --git a/packages/taiko-client/prover/proof_producer/dummy_producer.go b/packages/taiko-client/prover/proof_producer/dummy_producer.go index 54d10df173..5828979b52 100644 --- a/packages/taiko-client/prover/proof_producer/dummy_producer.go +++ b/packages/taiko-client/prover/proof_producer/dummy_producer.go @@ -31,3 +31,15 @@ func (o *DummyProofProducer) RequestProof( Tier: tier, }, nil } + +// RequestBatchProofs returns a dummy proof aggregation to the result channel. +func (o *DummyProofProducer) RequestBatchProofs( + proofs []*ProofWithHeader, + tier uint16, +) (*BatchProofs, error) { + return &BatchProofs{ + Proofs: proofs, + BatchProof: bytes.Repeat([]byte{0xbb}, 100), + Tier: tier, + }, nil +} diff --git a/packages/taiko-client/prover/proof_producer/guardian_producer.go b/packages/taiko-client/prover/proof_producer/guardian_producer.go index fb3c45865e..39439deb1a 100644 --- a/packages/taiko-client/prover/proof_producer/guardian_producer.go +++ b/packages/taiko-client/prover/proof_producer/guardian_producer.go @@ -61,6 +61,7 @@ func (g *GuardianProofProducer) RequestProof( return g.DummyProofProducer.RequestProof(opts, blockID, meta, header, g.Tier(), requestAt) } +// RequestCancel implements the ProofProducer interface to cancel the proof generating progress. func (g *GuardianProofProducer) RequestCancel( _ context.Context, _ *ProofRequestOptions, @@ -68,6 +69,15 @@ func (g *GuardianProofProducer) RequestCancel( return nil } +// Aggregate implements the ProofProducer interface to aggregate a batch of proofs. +func (g *GuardianProofProducer) Aggregate( + _ context.Context, + _ []*ProofWithHeader, + _ time.Time, +) (*BatchProofs, error) { + return nil, nil +} + // Tier implements the ProofProducer interface. func (g *GuardianProofProducer) Tier() uint16 { return g.tier diff --git a/packages/taiko-client/prover/proof_producer/optimistic_producer.go b/packages/taiko-client/prover/proof_producer/optimistic_producer.go index b97feef14e..8596d518a7 100644 --- a/packages/taiko-client/prover/proof_producer/optimistic_producer.go +++ b/packages/taiko-client/prover/proof_producer/optimistic_producer.go @@ -35,6 +35,31 @@ func (o *OptimisticProofProducer) RequestProof( return o.DummyProofProducer.RequestProof(opts, blockID, meta, header, o.Tier(), requestAt) } +// Aggregate implements the ProofProducer interface to aggregate a batch of proofs. +func (o *OptimisticProofProducer) Aggregate( + _ context.Context, + items []*ProofWithHeader, + _ time.Time, +) (*BatchProofs, error) { + log.Info( + "Aggregate batch optimistic proof", + ) + if len(items) == 0 { + return nil, ErrInvalidLength + } + blockIDs := make([]*big.Int, len(items)) + for i, item := range items { + blockIDs[i] = item.Meta.GetBlockID() + } + batchProof, err := o.DummyProofProducer.RequestBatchProofs(items, o.Tier()) + if err != nil { + return nil, err + } + batchProof.BlockIDs = blockIDs + return batchProof, nil +} + +// RequestCancel implements the ProofProducer interface to cancel the proof generating progress. func (o *OptimisticProofProducer) RequestCancel( _ context.Context, _ *ProofRequestOptions, diff --git a/packages/taiko-client/prover/proof_producer/proof_producer.go b/packages/taiko-client/prover/proof_producer/proof_producer.go index 3987963539..292a90d2d6 100644 --- a/packages/taiko-client/prover/proof_producer/proof_producer.go +++ b/packages/taiko-client/prover/proof_producer/proof_producer.go @@ -15,6 +15,7 @@ import ( var ( errProofGenerating = errors.New("proof is generating") errEmptyProof = errors.New("proof is empty") + ErrInvalidLength = errors.New("invalid items length") ) // ProofRequestBody represents a request body to generate a proof. @@ -46,6 +47,7 @@ type ProofRequestOptions struct { Graffiti string GasUsed uint64 ParentGasUsed uint64 + Compressed bool } type ProofWithHeader struct { @@ -57,6 +59,13 @@ type ProofWithHeader struct { Tier uint16 } +type BatchProofs struct { + Proofs []*ProofWithHeader + BatchProof []byte + Tier uint16 + BlockIDs []*big.Int +} + type ProofProducer interface { RequestProof( ctx context.Context, @@ -66,6 +75,11 @@ type ProofProducer interface { header *types.Header, requestAt time.Time, ) (*ProofWithHeader, error) + Aggregate( + ctx context.Context, + items []*ProofWithHeader, + requestAt time.Time, + ) (*BatchProofs, error) RequestCancel( ctx context.Context, opts *ProofRequestOptions, diff --git a/packages/taiko-client/prover/proof_producer/sgx_producer.go b/packages/taiko-client/prover/proof_producer/sgx_producer.go index 0ff4eb3c24..3f4861fd93 100644 --- a/packages/taiko-client/prover/proof_producer/sgx_producer.go +++ b/packages/taiko-client/prover/proof_producer/sgx_producer.go @@ -47,6 +47,17 @@ type RaikoRequestProofBody struct { SP1 *SP1RequestProofBodyParam `json:"sp1"` } +// RaikoRequestProofBodyV3 represents the JSON body for requesting the proof. +type RaikoRequestProofBodyV3 struct { + Blocks [][2]*big.Int `json:"block_numbers"` + Prover string `json:"prover"` + Graffiti string `json:"graffiti"` + Type string `json:"proof_type"` + SGX *SGXRequestProofBodyParam `json:"sgx"` + RISC0 *RISC0RequestProofBodyParam `json:"risc0"` + SP1 *SP1RequestProofBodyParam `json:"sp1"` +} + // SGXRequestProofBodyParam represents the JSON body of RaikoRequestProofBody's `sgx` field. type SGXRequestProofBodyParam struct { Setup bool `json:"setup"` @@ -66,6 +77,7 @@ type RISC0RequestProofBodyParam struct { type SP1RequestProofBodyParam struct { Recursion string `json:"recursion"` Prover string `json:"prover"` + Verify bool `json:"verify"` } // RaikoRequestProofBodyResponse represents the JSON body of the response of the proof requests. @@ -117,13 +129,207 @@ func (s *SGXProofProducer) RequestProof( }, nil } +// Aggregate implements the ProofProducer interface to aggregate a batch of proofs. +func (s *SGXProofProducer) Aggregate( + ctx context.Context, + items []*ProofWithHeader, + requestAt time.Time, +) (*BatchProofs, error) { + log.Info( + "Aggregate sgx batch proofs from raiko-host service", + ) + if len(items) == 0 { + return nil, ErrInvalidLength + } + + blockIDs := make([]*big.Int, len(items)) + for i, item := range items { + blockIDs[i] = item.Meta.GetBlockID() + } + batchProof, err := s.requestBatchProof( + ctx, + blockIDs, + items[0].Opts.ProverAddress, + items[0].Opts.Graffiti, + requestAt, + ) + if err != nil { + return nil, err + } + + metrics.ProverSgxProofAggregationGeneratedCounter.Add(1) + + return &BatchProofs{ + Proofs: items, + BatchProof: batchProof, + Tier: s.Tier(), + BlockIDs: blockIDs, + }, nil +} + +// RequestCancel implements the ProofProducer interface to cancel the proof generating progress. func (s *SGXProofProducer) RequestCancel( - _ context.Context, - _ *ProofRequestOptions, + ctx context.Context, + opts *ProofRequestOptions, ) error { + reqBody := RaikoRequestProofBody{ + Type: s.ProofType, + Block: opts.BlockID, + Prover: opts.ProverAddress.Hex()[2:], + Graffiti: opts.Graffiti, + SGX: &SGXRequestProofBodyParam{ + Setup: false, + Bootstrap: false, + Prove: true, + }, + } + + client := &http.Client{} + + jsonValue, err := json.Marshal(reqBody) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext( + ctx, + "POST", + s.RaikoHostEndpoint+"/v2/proof/cancel", + bytes.NewBuffer(jsonValue), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if len(s.JWT) > 0 { + req.Header.Set("Authorization", "Bearer "+base64.StdEncoding.EncodeToString([]byte(s.JWT))) + } + + res, err := client.Do(req) + if err != nil { + return err + } + + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return fmt.Errorf("failed to cancel requesting proof, statusCode: %d", res.StatusCode) + } + return nil } +// requestBatchProof poll the proof aggregation service to get the aggregated proof. +func (s *SGXProofProducer) requestBatchProof( + ctx context.Context, + blockIDs []*big.Int, + proverAddress common.Address, + graffiti string, + requestAt time.Time, +) ([]byte, error) { + var ( + proof []byte + ) + + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + blocks := make([][2]*big.Int, len(blockIDs)) + for i := range blockIDs { + blocks[i][0] = blockIDs[i] + } + reqBody := RaikoRequestProofBodyV3{ + Type: s.ProofType, + Blocks: blocks, + Prover: proverAddress.Hex()[2:], + Graffiti: graffiti, + SGX: &SGXRequestProofBodyParam{ + Setup: false, + Bootstrap: false, + Prove: true, + }, + } + + client := &http.Client{} + + jsonValue, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + log.Debug( + "Send batch proof generation request", + "blockIDs", blockIDs, + "proofType", "sgx", + "input", string(jsonValue), + ) + + req, err := http.NewRequestWithContext( + ctx, + "POST", + s.RaikoHostEndpoint+"/v3/proof", + bytes.NewBuffer(jsonValue), + ) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + if len(s.JWT) > 0 { + req.Header.Set("Authorization", "Bearer "+base64.StdEncoding.EncodeToString([]byte(s.JWT))) + } + + res, err := client.Do(req) + if err != nil { + return nil, err + } + + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to request batch proof, ids: %v, statusCode: %d", blockIDs, res.StatusCode) + } + + resBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + log.Debug( + "Batch proof generation output", + "blockIDs", blockIDs, + "proofType", "sgx", + "output", string(resBytes), + ) + + var output RaikoRequestProofBodyResponseV2 + if err := json.Unmarshal(resBytes, &output); err != nil { + return nil, err + } + + if len(output.ErrorMessage) > 0 || len(output.Error) > 0 { + return nil, fmt.Errorf("failed to get batch proof, msg: %s", output.ErrorMessage) + } + + if output.Data.Status == ErrProofInProgress.Error() { + return nil, ErrProofInProgress + } + if output.Data.Status == StatusRegistered { + return nil, ErrRetry + } + + if len(output.Data.Proof.Proof) == 0 { + return nil, errEmptyProof + } + proof = common.Hex2Bytes(output.Data.Proof.Proof[2:]) + + log.Info( + "Batch proof generated", + "blockIDs", blockIDs, + "time", time.Since(requestAt), + "producer", "SGXProofProducer", + ) + + return proof, nil +} + // callProverDaemon keeps polling the proverd service to get the requested proof. func (s *SGXProofProducer) callProverDaemon( ctx context.Context, @@ -153,15 +359,22 @@ func (s *SGXProofProducer) callProverDaemon( return nil, errProofGenerating } + if output.Data.Status == ErrProofInProgress.Error() { + return nil, ErrProofInProgress + } + if output.Data.Status == StatusRegistered { + return nil, ErrRetry + } + // Raiko returns "" as proof when proof type is native, // so we just convert "" to bytes if s.ProofType == ProofTypeCPU { - proof = common.Hex2Bytes(output.Data.Proof) + proof = common.Hex2Bytes(output.Data.Proof.Proof) } else { - if len(output.Data.Proof) == 0 { + if len(output.Data.Proof.Proof) == 0 { return nil, errEmptyProof } - proof = common.Hex2Bytes(output.Data.Proof[2:]) + proof = common.Hex2Bytes(output.Data.Proof.Proof[2:]) } log.Info( @@ -178,7 +391,7 @@ func (s *SGXProofProducer) callProverDaemon( func (s *SGXProofProducer) requestProof( ctx context.Context, opts *ProofRequestOptions, -) (*RaikoRequestProofBodyResponse, error) { +) (*RaikoRequestProofBodyResponseV2, error) { reqBody := RaikoRequestProofBody{ Type: s.ProofType, Block: opts.BlockID, @@ -198,7 +411,7 @@ func (s *SGXProofProducer) requestProof( return nil, err } - req, err := http.NewRequestWithContext(ctx, "POST", s.RaikoHostEndpoint+"/v1/proof", bytes.NewBuffer(jsonValue)) + req, err := http.NewRequestWithContext(ctx, "POST", s.RaikoHostEndpoint+"/v2/proof", bytes.NewBuffer(jsonValue)) if err != nil { return nil, err } @@ -225,17 +438,17 @@ func (s *SGXProofProducer) requestProof( log.Debug( "Proof generation output", "blockID", opts.BlockID, - "zkType", "sgx", + "proofType", "sgx", "output", string(resBytes), ) - var output RaikoRequestProofBodyResponse + var output RaikoRequestProofBodyResponseV2 if err := json.Unmarshal(resBytes, &output); err != nil { return nil, err } - if len(output.ErrorMessage) > 0 { - return nil, fmt.Errorf("failed to get proof, msg: %s", output.ErrorMessage) + if len(output.ErrorMessage) > 0 || len(output.Error) > 0 { + return nil, fmt.Errorf("failed to get proof,err: %s, msg: %s", output.Error, output.ErrorMessage) } return &output, nil diff --git a/packages/taiko-client/prover/proof_producer/zkvm_producer.go b/packages/taiko-client/prover/proof_producer/zkvm_producer.go index 5892b9d506..3e2711a196 100644 --- a/packages/taiko-client/prover/proof_producer/zkvm_producer.go +++ b/packages/taiko-client/prover/proof_producer/zkvm_producer.go @@ -23,8 +23,10 @@ import ( ) const ( - ZKProofTypeR0 = "risc0" - ZKProofTypeSP1 = "sp1" + ZKProofTypeR0 = "risc0" + ZKProofTypeSP1 = "sp1" + RecursionPlonk = "plonk" + RecursionCompressed = "compressed" ) var ( @@ -37,6 +39,7 @@ var ( type RaikoRequestProofBodyResponseV2 struct { Data *RaikoProofDataV2 `json:"data"` ErrorMessage string `json:"message"` + Error string `json:"error"` } type RaikoProofDataV2 struct { @@ -103,6 +106,7 @@ func (s *ZKvmProofProducer) RequestProof( }, nil } +// RequestCancel implements the ProofProducer interface to cancel the proof generating progress. func (s *ZKvmProofProducer) RequestCancel( ctx context.Context, opts *ProofRequestOptions, @@ -110,6 +114,50 @@ func (s *ZKvmProofProducer) RequestCancel( return s.requestCancel(ctx, opts) } +// Aggregate implements the ProofProducer interface to aggregate a batch of proofs. +func (s *ZKvmProofProducer) Aggregate( + ctx context.Context, + items []*ProofWithHeader, + requestAt time.Time, +) (*BatchProofs, error) { + log.Info( + "Aggregate zkvm batch proofs from raiko-host service", + "zkType", s.ZKProofType, + ) + if len(items) == 0 { + return nil, ErrInvalidLength + } + + blockIDs := make([]*big.Int, len(items)) + for i, item := range items { + blockIDs[i] = item.Meta.GetBlockID() + } + batchProof, err := s.requestBatchProof( + ctx, + blockIDs, + items[0].Opts.ProverAddress, + items[0].Opts.Graffiti, + requestAt, + ) + if err != nil { + return nil, err + } + + switch s.ZKProofType { + case ZKProofTypeSP1: + metrics.ProverSp1ProofAggregationGeneratedCounter.Add(1) + default: + metrics.ProverR0ProofAggregationGeneratedCounter.Add(1) + } + + return &BatchProofs{ + Proofs: items, + BatchProof: batchProof, + Tier: s.Tier(), + BlockIDs: blockIDs, + }, nil +} + // callProverDaemon keeps polling the proverd service to get the requested proof. func (s *ZKvmProofProducer) callProverDaemon( ctx context.Context, @@ -136,10 +184,12 @@ func (s *ZKvmProofProducer) callProverDaemon( return nil, ErrRetry } - if len(output.Data.Proof.Proof) == 0 { - return nil, errEmptyProof + if !opts.Compressed { + if len(output.Data.Proof.Proof) == 0 { + return nil, errEmptyProof + } + proof = common.Hex2Bytes(output.Data.Proof.Proof[2:]) } - proof = common.Hex2Bytes(output.Data.Proof.Proof[2:]) log.Info( "Proof generated", "height", opts.BlockID, @@ -155,7 +205,15 @@ func (s *ZKvmProofProducer) requestProof( ctx context.Context, opts *ProofRequestOptions, ) (*RaikoRequestProofBodyResponseV2, error) { - var reqBody RaikoRequestProofBody + var ( + reqBody RaikoRequestProofBody + recursion string + ) + if opts.Compressed { + recursion = RecursionCompressed + } else { + recursion = RecursionPlonk + } switch s.ZKProofType { case ZKProofTypeSP1: reqBody = RaikoRequestProofBody{ @@ -164,8 +222,9 @@ func (s *ZKvmProofProducer) requestProof( Prover: opts.ProverAddress.Hex()[2:], Graffiti: opts.Graffiti, SP1: &SP1RequestProofBodyParam{ - Recursion: "plonk", + Recursion: recursion, Prover: "network", + Verify: true, }, } default: @@ -199,6 +258,13 @@ func (s *ZKvmProofProducer) requestProof( req.Header.Set("Authorization", "Bearer "+base64.StdEncoding.EncodeToString([]byte(s.JWT))) } + log.Debug( + "Send proof generation request", + "blockID", opts.BlockID, + "zkProofType", s.ZKProofType, + "input", string(jsonValue), + ) + res, err := client.Do(req) if err != nil { return nil, err @@ -225,8 +291,8 @@ func (s *ZKvmProofProducer) requestProof( return nil, err } - if len(output.ErrorMessage) > 0 { - return nil, fmt.Errorf("failed to get proof, msg: %s", output.ErrorMessage) + if len(output.ErrorMessage) > 0 || len(output.Error) > 0 { + return nil, fmt.Errorf("failed to get proof,err: %s, msg: %s", output.Error, output.ErrorMessage) } return &output, nil @@ -236,17 +302,41 @@ func (s *ZKvmProofProducer) requestCancel( ctx context.Context, opts *ProofRequestOptions, ) error { - reqBody := RaikoRequestProofBody{ - Type: s.ZKProofType, - Block: opts.BlockID, - Prover: opts.ProverAddress.Hex()[2:], - Graffiti: opts.Graffiti, - RISC0: &RISC0RequestProofBodyParam{ - Bonsai: true, - Snark: true, - Profile: false, - ExecutionPo2: big.NewInt(20), - }, + var ( + reqBody RaikoRequestProofBody + recursion string + ) + if opts.Compressed { + recursion = RecursionCompressed + } else { + recursion = RecursionPlonk + } + switch s.ZKProofType { + case ZKProofTypeSP1: + reqBody = RaikoRequestProofBody{ + Type: s.ZKProofType, + Block: opts.BlockID, + Prover: opts.ProverAddress.Hex()[2:], + Graffiti: opts.Graffiti, + SP1: &SP1RequestProofBodyParam{ + Recursion: recursion, + Prover: "network", + Verify: true, + }, + } + default: + reqBody = RaikoRequestProofBody{ + Type: s.ZKProofType, + Block: opts.BlockID, + Prover: opts.ProverAddress.Hex()[2:], + Graffiti: opts.Graffiti, + RISC0: &RISC0RequestProofBodyParam{ + Bonsai: true, + Snark: true, + Profile: false, + ExecutionPo2: big.NewInt(20), + }, + } } client := &http.Client{} @@ -283,6 +373,134 @@ func (s *ZKvmProofProducer) requestCancel( return nil } +// requestBatchProof poll the proof aggregation service to get the aggregated proof. +func (s *ZKvmProofProducer) requestBatchProof( + ctx context.Context, + blockIDs []*big.Int, + proverAddress common.Address, + graffiti string, + requestAt time.Time, +) ([]byte, error) { + var ( + proof []byte + ) + + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + blocks := make([][2]*big.Int, len(blockIDs)) + for i := range blockIDs { + blocks[i][0] = blockIDs[i] + } + var reqBody RaikoRequestProofBodyV3 + switch s.ZKProofType { + case ZKProofTypeSP1: + reqBody = RaikoRequestProofBodyV3{ + Type: s.ZKProofType, + Blocks: blocks, + Prover: proverAddress.Hex()[2:], + Graffiti: graffiti, + SP1: &SP1RequestProofBodyParam{ + Recursion: RecursionCompressed, + Prover: "network", + Verify: true, + }, + } + default: + reqBody = RaikoRequestProofBodyV3{ + Type: s.ZKProofType, + Blocks: blocks, + Prover: proverAddress.Hex()[2:], + Graffiti: graffiti, + RISC0: &RISC0RequestProofBodyParam{ + Bonsai: true, + Snark: true, + Profile: false, + ExecutionPo2: big.NewInt(20), + }, + } + } + + client := &http.Client{} + + jsonValue, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + log.Debug( + "Send batch proof generation request", + "blockIDs", blockIDs, + "zkProofType", s.ZKProofType, + "input", string(jsonValue), + ) + + req, err := http.NewRequestWithContext( + ctx, + "POST", + s.RaikoHostEndpoint+"/v3/proof", + bytes.NewBuffer(jsonValue), + ) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + if len(s.JWT) > 0 { + req.Header.Set("Authorization", "Bearer "+base64.StdEncoding.EncodeToString([]byte(s.JWT))) + } + + res, err := client.Do(req) + if err != nil { + return nil, err + } + + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to request batch proof, ids: %v, statusCode: %d", blockIDs, res.StatusCode) + } + + resBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + log.Debug( + "Batch proof generation output", + "blockIDs", blockIDs, + "zkProofType", s.ZKProofType, + "output", string(resBytes), + ) + + var output RaikoRequestProofBodyResponseV2 + if err := json.Unmarshal(resBytes, &output); err != nil { + return nil, err + } + + if len(output.ErrorMessage) > 0 || len(output.Error) > 0 { + return nil, fmt.Errorf("failed to get batch proof, msg: %s", output.ErrorMessage) + } + if output.Data.Status == ErrProofInProgress.Error() { + return nil, ErrProofInProgress + } + if output.Data.Status == StatusRegistered { + return nil, ErrRetry + } + + if len(output.Data.Proof.Proof) == 0 { + return nil, errEmptyProof + } + proof = common.Hex2Bytes(output.Data.Proof.Proof[2:]) + + log.Info( + "Batch proof generated", + "blockIDs", blockIDs, + "time", time.Since(requestAt), + "producer", "ZKvmProofProducer", + ) + + return proof, nil +} + // Tier implements the ProofProducer interface. func (s *ZKvmProofProducer) Tier() uint16 { switch s.ZKProofType { diff --git a/packages/taiko-client/prover/proof_submitter/interface.go b/packages/taiko-client/prover/proof_submitter/interface.go index 7b7ace1390..26971fc858 100644 --- a/packages/taiko-client/prover/proof_submitter/interface.go +++ b/packages/taiko-client/prover/proof_submitter/interface.go @@ -14,8 +14,11 @@ import ( type Submitter interface { RequestProof(ctx context.Context, meta metadata.TaikoBlockMetaData) error SubmitProof(ctx context.Context, proofWithHeader *proofProducer.ProofWithHeader) error + BatchSubmitProofs(ctx context.Context, proofsWithHeaders *proofProducer.BatchProofs) error + AggregateProofs(ctx context.Context) error Producer() proofProducer.ProofProducer Tier() uint16 + BufferSize() uint64 } // Contester is the interface for contesting proofs of the L2 blocks. diff --git a/packages/taiko-client/prover/proof_submitter/proof_buffer.go b/packages/taiko-client/prover/proof_submitter/proof_buffer.go new file mode 100644 index 0000000000..b21e992aa3 --- /dev/null +++ b/packages/taiko-client/prover/proof_submitter/proof_buffer.go @@ -0,0 +1,98 @@ +package submitter + +import ( + "errors" + "sync" + + producer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" +) + +var ( + errBufferOverflow = errors.New("proof buffer overflow") + errNotEnoughProof = errors.New("not enough proof") +) + +// ProofBuffer caches all single proof with a fixed size. +type ProofBuffer struct { + MaxLength uint64 + buffer []*producer.ProofWithHeader + mutex sync.RWMutex +} + +// NewProofBuffer creates a new ProofBuffer instance. +func NewProofBuffer(maxLength uint64) *ProofBuffer { + return &ProofBuffer{ + buffer: make([]*producer.ProofWithHeader, 0, maxLength), + MaxLength: maxLength, + } +} + +// Write adds new item to the buffer. +func (pb *ProofBuffer) Write(item *producer.ProofWithHeader) (int, error) { + pb.mutex.Lock() + defer pb.mutex.Unlock() + + if len(pb.buffer)+1 > int(pb.MaxLength) { + return len(pb.buffer), errBufferOverflow + } + + pb.buffer = append(pb.buffer, item) + return len(pb.buffer), nil +} + +// Read returns the content with given length in the buffer. +func (pb *ProofBuffer) Read(length int) ([]*producer.ProofWithHeader, error) { + pb.mutex.RLock() + defer pb.mutex.RUnlock() + if length > len(pb.buffer) { + return nil, errNotEnoughProof + } + + data := make([]*producer.ProofWithHeader, length) + copy(data, pb.buffer[:length]) + return data, nil +} + +// ReadAll returns all the content in the buffer. +func (pb *ProofBuffer) ReadAll() ([]*producer.ProofWithHeader, error) { + return pb.Read(pb.Len()) +} + +// Len returns current length of the buffer. +func (pb *ProofBuffer) Len() int { + pb.mutex.RLock() + defer pb.mutex.RUnlock() + return len(pb.buffer) +} + +// Clear clears all buffer. +func (pb *ProofBuffer) Clear() { + pb.mutex.Lock() + defer pb.mutex.Unlock() + pb.buffer = pb.buffer[:0] +} + +// ClearItems clears items that has given block ids in the buffer. +func (pb *ProofBuffer) ClearItems(blockIDs ...uint64) int { + pb.mutex.Lock() + defer pb.mutex.Unlock() + + clearMap := make(map[uint64]bool) + for _, blockID := range blockIDs { + clearMap[blockID] = true + } + + newBuffer := make([]*producer.ProofWithHeader, 0, len(pb.buffer)) + clearedCount := 0 + + for _, b := range pb.buffer { + if !clearMap[b.Meta.GetBlockID().Uint64()] { + newBuffer = append(newBuffer, b) + } else { + clearedCount++ + } + } + + pb.buffer = newBuffer + return clearedCount +} diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index c4040bcbb0..ba1cb8bad0 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -10,6 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -29,25 +30,30 @@ var ( submissionDelayRandomBumpRange float64 = 20 proofPollingInterval = 10 * time.Second ProofTimeout = 3 * time.Hour + ErrInvalidProof = errors.New("invalid proof found") ) // ProofSubmitter is responsible requesting proofs for the given L2 // blocks, and submitting the generated proofs to the TaikoL1 smart contract. type ProofSubmitter struct { - rpc *rpc.Client - proofProducer proofProducer.ProofProducer - resultCh chan *proofProducer.ProofWithHeader - anchorValidator *validator.AnchorTxValidator - txBuilder *transaction.ProveBlockTxBuilder - sender *transaction.Sender - proverAddress common.Address - proverSetAddress common.Address - taikoL2Address common.Address - graffiti [32]byte - tiers []*rpc.TierProviderTierWithID + rpc *rpc.Client + proofProducer proofProducer.ProofProducer + resultCh chan *proofProducer.ProofWithHeader + batchResultCh chan *proofProducer.BatchProofs + aggregationNotify chan uint16 + anchorValidator *validator.AnchorTxValidator + txBuilder *transaction.ProveBlockTxBuilder + sender *transaction.Sender + proverAddress common.Address + proverSetAddress common.Address + taikoL2Address common.Address + graffiti [32]byte + tiers []*rpc.TierProviderTierWithID // Guardian prover related. isGuardian bool submissionDelay time.Duration + // Batch proof related + proofBuffer *ProofBuffer } // NewProofSubmitter creates a new ProofSubmitter instance. @@ -55,6 +61,8 @@ func NewProofSubmitter( rpcClient *rpc.Client, proofProducer proofProducer.ProofProducer, resultCh chan *proofProducer.ProofWithHeader, + batchResultCh chan *proofProducer.BatchProofs, + aggregationNotify chan uint16, proverSetAddress common.Address, taikoL2Address common.Address, graffiti string, @@ -65,6 +73,7 @@ func NewProofSubmitter( tiers []*rpc.TierProviderTierWithID, isGuardian bool, submissionDelay time.Duration, + proofBufferSize uint64, ) (*ProofSubmitter, error) { anchorValidator, err := validator.New(taikoL2Address, rpcClient.L2.ChainID, rpcClient) if err != nil { @@ -72,19 +81,22 @@ func NewProofSubmitter( } return &ProofSubmitter{ - rpc: rpcClient, - proofProducer: proofProducer, - resultCh: resultCh, - anchorValidator: anchorValidator, - txBuilder: builder, - sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit), - proverAddress: txmgr.From(), - proverSetAddress: proverSetAddress, - taikoL2Address: taikoL2Address, - graffiti: rpc.StringToBytes32(graffiti), - tiers: tiers, - isGuardian: isGuardian, - submissionDelay: submissionDelay, + rpc: rpcClient, + proofProducer: proofProducer, + resultCh: resultCh, + batchResultCh: batchResultCh, + aggregationNotify: aggregationNotify, + anchorValidator: anchorValidator, + txBuilder: builder, + sender: transaction.NewSender(rpcClient, txmgr, privateTxmgr, proverSetAddress, gasLimit), + proverAddress: txmgr.From(), + proverSetAddress: proverSetAddress, + taikoL2Address: taikoL2Address, + graffiti: rpc.StringToBytes32(graffiti), + tiers: tiers, + isGuardian: isGuardian, + submissionDelay: submissionDelay, + proofBuffer: NewProofBuffer(proofBufferSize), }, nil } @@ -131,13 +143,13 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl Graffiti: common.Bytes2Hex(s.graffiti[:]), GasUsed: header.GasUsed, ParentGasUsed: parent.GasUsed(), + Compressed: s.proofBuffer.MaxLength > 1, } // If the prover set address is provided, we use that address as the prover on chain. if s.proverSetAddress != rpc.ZeroAddress { opts.ProverAddress = s.proverSetAddress } - startTime := time.Now() // Send the generated proof. @@ -147,6 +159,11 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl log.Error("Failed to request proof, context is canceled", "blockID", opts.BlockID, "error", ctx.Err()) return nil } + // Check if the proof buffer is full + if s.proofBuffer.MaxLength > 1 && s.proofBuffer.MaxLength == uint64(s.proofBuffer.Len()) { + log.Debug("Buffer is full now", "blockID", meta.GetBlockID()) + return errBufferOverflow + } // Check if there is a need to generate proof proofStatus, err := rpc.GetBlockProofStatus( ctx, @@ -181,7 +198,25 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoBl } return fmt.Errorf("failed to request proof (id: %d): %w", meta.GetBlockID(), err) } - s.resultCh <- result + if meta.IsOntakeBlock() && s.proofBuffer.MaxLength > 1 { + bufferSize, err := s.proofBuffer.Write(result) + if err != nil { + return fmt.Errorf("failed to add proof into buffer (id: %d)(current buffer size: %d): %w", + meta.GetBlockID(), + bufferSize, + err, + ) + } + log.Debug("Succeed to generate proof", + "blockID", meta.GetBlockID(), + "bufferSize", bufferSize, + ) + if s.proofBuffer.MaxLength == uint64(bufferSize) { + s.aggregationNotify <- s.Tier() + } + } else { + s.resultCh <- result + } metrics.ProverQueuedProofCounter.Add(1) return nil }, @@ -299,6 +334,146 @@ func (s *ProofSubmitter) SubmitProof( return nil } +// BatchSubmitProofs implements the Submitter interface to submit proof aggregation. +func (s *ProofSubmitter) BatchSubmitProofs(ctx context.Context, batchProof *proofProducer.BatchProofs) error { + log.Info( + "Batch submit block proofs", + "proof", common.Bytes2Hex(batchProof.BatchProof), + "blockIds", batchProof.BlockIDs, + "tier", batchProof.Tier, + ) + var ( + invalidBlockIDs []uint64 + latestProvenBlockID = big.NewInt(0) + ) + if len(batchProof.Proofs) == 0 { + return proofProducer.ErrInvalidLength + } + // Check if the proof has already been submitted. + proofStatus, err := rpc.BatchGetBlocksProofStatus( + ctx, + s.rpc, + batchProof.BlockIDs, + batchProof.Proofs[0].Opts.ProverAddress, + s.proverSetAddress, + ) + if err != nil { + return err + } + stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) + if err != nil { + log.Warn( + "Failed to fetch state variables", + "error", err, + ) + return err + } + for i, proof := range batchProof.Proofs { + // Check if this proof is still needed to be submitted. + ok, err := s.sender.ValidateProof(ctx, proof, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId)) + if err != nil { + return err + } + if !ok { + log.Error("a valid proof for block is already submitted", "blockId", proof.BlockID) + invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) + continue + } + + if proofStatus[i].IsSubmitted && !proofStatus[i].Invalid { + log.Error("a valid proof for block is already submitted", "blockId", proof.BlockID) + invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) + continue + } + + // Get the corresponding L2 block. + block, err := s.rpc.L2.BlockByHash(ctx, proof.Header.Hash()) + if err != nil { + log.Error("failed to get L2 block with given hash", + "hash", proof.Header.Hash(), + "error", err, + ) + invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) + continue + } + + if block.Transactions().Len() == 0 { + log.Error("Invalid block without anchor transaction, blockID", "blockId", proof.BlockID) + invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) + continue + } + + // Validate TaikoL2.anchor transaction inside the L2 block. + anchorTx := block.Transactions()[0] + if err = s.anchorValidator.ValidateAnchorTx(anchorTx); err != nil { + log.Error("Invalid anchor transaction", "error", err) + invalidBlockIDs = append(invalidBlockIDs, proof.BlockID.Uint64()) + } + if proof.BlockID.Cmp(latestProvenBlockID) > 0 { + latestProvenBlockID = proof.BlockID + } + } + + if len(invalidBlockIDs) > 0 { + log.Warn("Detected invalid proofs", "blockIds", invalidBlockIDs) + s.proofBuffer.ClearItems(invalidBlockIDs...) + return ErrInvalidProof + } + + // Build the TaikoL1.proveBlocks transaction and send it to the L1 node. + if err := s.sender.SendBatchProof( + ctx, + s.txBuilder.BuildProveBlocks(batchProof, s.graffiti), + batchProof, + ); err != nil { + if err.Error() == transaction.ErrUnretryableSubmission.Error() { + return nil + } + metrics.ProverAggregationSubmissionErrorCounter.Add(1) + return err + } + + metrics.ProverSentProofCounter.Add(float64(len(batchProof.BlockIDs))) + metrics.ProverLatestProvenBlockIDGauge.Set(float64(latestProvenBlockID.Uint64())) + s.proofBuffer.Clear() + + return nil +} + +// AggregateProofs read all data from buffer and aggregate them. +func (s *ProofSubmitter) AggregateProofs(ctx context.Context) error { + startTime := time.Now() + if err := backoff.Retry( + func() error { + buffer, err := s.proofBuffer.ReadAll() + if err != nil { + return fmt.Errorf("failed to read proof from buffer: %w", err) + } + if len(buffer) == 0 { + log.Debug("Buffer is empty now, skip aggregating") + return nil + } + + result, err := s.proofProducer.Aggregate( + ctx, + buffer, + startTime, + ) + if err != nil { + log.Error("Failed to request proof aggregation", "err", err) + return err + } + s.batchResultCh <- result + return nil + }, + backoff.WithContext(backoff.NewConstantBackOff(proofPollingInterval), ctx), + ); err != nil { + log.Error("Aggregate proof error", "error", err) + return err + } + return nil +} + // getRandomBumpedSubmissionDelay returns a random bumped submission delay. func (s *ProofSubmitter) getRandomBumpedSubmissionDelay(expiredAt time.Time) (time.Duration, error) { if s.submissionDelay == 0 { @@ -331,3 +506,8 @@ func (s *ProofSubmitter) Producer() proofProducer.ProofProducer { func (s *ProofSubmitter) Tier() uint16 { return s.proofProducer.Tier() } + +// BufferSize returns the size of the proof buffer. +func (s *ProofSubmitter) BufferSize() uint64 { + return s.proofBuffer.MaxLength +} diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go b/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go index 7b96d2f226..3a7cbde8a1 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter_test.go @@ -31,17 +31,21 @@ import ( type ProofSubmitterTestSuite struct { testutils.ClientTestSuite - submitter *ProofSubmitter - contester *ProofContester - blobSyncer *blob.Syncer - proposer *proposer.Proposer - proofCh chan *producer.ProofWithHeader + submitter *ProofSubmitter + contester *ProofContester + blobSyncer *blob.Syncer + proposer *proposer.Proposer + proofCh chan *producer.ProofWithHeader + batchProofGenerationCh chan *producer.BatchProofs + aggregationNotify chan uint16 } func (s *ProofSubmitterTestSuite) SetupTest() { s.ClientTestSuite.SetupTest() s.proofCh = make(chan *producer.ProofWithHeader, 1024) + s.batchProofGenerationCh = make(chan *producer.BatchProofs, 1024) + s.aggregationNotify = make(chan uint16, 1) builder := transaction.NewProveBlockTxBuilder( s.RPCClient, @@ -83,6 +87,8 @@ func (s *ProofSubmitterTestSuite) SetupTest() { s.RPCClient, &producer.OptimisticProofProducer{}, s.proofCh, + s.batchProofGenerationCh, + s.aggregationNotify, rpc.ZeroAddress, common.HexToAddress(os.Getenv("TAIKO_L2")), "test", @@ -93,6 +99,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() { tiers, false, 0*time.Second, + 0, ) s.Nil(err) s.contester = NewProofContester( @@ -179,6 +186,8 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { s.RPCClient, &producer.OptimisticProofProducer{}, s.proofCh, + s.batchProofGenerationCh, + s.aggregationNotify, common.Address{}, common.HexToAddress(os.Getenv("TAIKO_L2")), "test", @@ -189,6 +198,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { s.submitter.tiers, false, time.Duration(0), + 0, ) s.Nil(err) @@ -200,6 +210,8 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { s.RPCClient, &producer.OptimisticProofProducer{}, s.proofCh, + s.batchProofGenerationCh, + s.aggregationNotify, common.Address{}, common.HexToAddress(os.Getenv("TAIKO_L2")), "test", @@ -210,6 +222,7 @@ func (s *ProofSubmitterTestSuite) TestGetRandomBumpedSubmissionDelay() { s.submitter.tiers, false, 1*time.Hour, + 0, ) s.Nil(err) delay, err = submitter2.getRandomBumpedSubmissionDelay(time.Now()) diff --git a/packages/taiko-client/prover/proof_submitter/transaction/builder.go b/packages/taiko-client/prover/proof_submitter/transaction/builder.go index 913bf9fd9d..4ec13d85b2 100644 --- a/packages/taiko-client/prover/proof_submitter/transaction/builder.go +++ b/packages/taiko-client/prover/proof_submitter/transaction/builder.go @@ -14,6 +14,7 @@ import ( "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/encoding" "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/metadata" "github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc" + proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) var ( @@ -153,3 +154,76 @@ func (a *ProveBlockTxBuilder) Build( }, nil } } + +// BuildProveBlocks creates a new TaikoL1.ProveBlocks transaction. +func (a *ProveBlockTxBuilder) BuildProveBlocks( + batchProof *proofProducer.BatchProofs, + graffiti [32]byte, +) TxBuilder { + return func(txOpts *bind.TransactOpts) (*txmgr.TxCandidate, error) { + var ( + data []byte + to common.Address + err error + metas = make([]metadata.TaikoBlockMetaData, len(batchProof.Proofs)) + transitions = make([]bindings.TaikoDataTransition, len(batchProof.Proofs)) + blockIDs = make([]uint64, len(batchProof.Proofs)) + ) + for i, proof := range batchProof.Proofs { + metas[i] = proof.Meta + transitions[i] = bindings.TaikoDataTransition{ + ParentHash: proof.Header.ParentHash, + BlockHash: proof.Opts.BlockHash, + StateRoot: proof.Opts.StateRoot, + Graffiti: graffiti, + } + blockIDs[i] = proof.BlockID.Uint64() + } + log.Info( + "Build batch proof submission transaction", + "blockIDs", blockIDs, + "gasLimit", txOpts.GasLimit, + ) + input, err := encoding.EncodeProveBlocksInput(metas, transitions) + if err != nil { + return nil, err + } + tierProof, err := encoding.EncodeProveBlocksBatchProof(&bindings.TaikoDataTierProof{ + Tier: batchProof.Tier, + Data: batchProof.BatchProof, + }) + if err != nil { + return nil, err + } + + if a.proverSetAddress != ZeroAddress { + if data, err = encoding.ProverSetABI.Pack( + "proveBlocks", + blockIDs, + input, + tierProof, + ); err != nil { + return nil, err + } + to = a.proverSetAddress + } else { + if data, err = encoding.TaikoL1ABI.Pack( + "proveBlocks", + blockIDs, + input, + tierProof, + ); err != nil { + return nil, err + } + to = a.taikoL1Address + } + + return &txmgr.TxCandidate{ + TxData: data, + To: &to, + Blobs: nil, + GasLimit: txOpts.GasLimit, + Value: txOpts.Value, + }, nil + } +} diff --git a/packages/taiko-client/prover/proof_submitter/transaction/sender.go b/packages/taiko-client/prover/proof_submitter/transaction/sender.go index b9aed4db82..3363390e18 100644 --- a/packages/taiko-client/prover/proof_submitter/transaction/sender.go +++ b/packages/taiko-client/prover/proof_submitter/transaction/sender.go @@ -65,7 +65,7 @@ func (s *Sender) Send( } // Check if this proof is still needed to be submitted. - ok, err := s.validateProof(ctx, proofWithHeader) + ok, err := s.ValidateProof(ctx, proofWithHeader, nil) if err != nil || !ok { return err } @@ -115,9 +115,56 @@ func (s *Sender) Send( return nil } -// validateProof checks if the proof's corresponding L1 block is still in the canonical chain and if the +func (s *Sender) SendBatchProof( + ctx context.Context, + buildTx TxBuilder, + batchProof *producer.BatchProofs, +) error { + // Assemble the TaikoL1.proveBlocks transaction. + txCandidate, err := buildTx(&bind.TransactOpts{GasLimit: s.gasLimit}) + if err != nil { + return err + } + // Send the transaction. + txMgr, isPrivate := s.txmgrSelector.Select() + receipt, err := txMgr.Send(ctx, *txCandidate) + if err != nil { + if isPrivate { + s.txmgrSelector.RecordPrivateTxMgrFailed() + } + return encoding.TryParsingCustomError(err) + } + + if receipt.Status != types.ReceiptStatusSuccessful { + log.Error( + "Failed to submit batch proofs", + "txHash", receipt.TxHash, + "isPrivateMempool", isPrivate, + "error", encoding.TryParsingCustomErrorFromReceipt(ctx, s.rpc.L1, txMgr.From(), receipt), + ) + metrics.ProverSubmissionRevertedCounter.Add(1) + return ErrUnretryableSubmission + } + + log.Info( + "🚚 Your batch proofs were accepted", + "txHash", receipt.TxHash, + "tier", batchProof.Tier, + "blockIDs", batchProof.BlockIDs, + ) + + metrics.ProverSubmissionAcceptedCounter.Add(float64(len(batchProof.BlockIDs))) + + return nil +} + +// ValidateProof checks if the proof's corresponding L1 block is still in the canonical chain and if the // latest verified head is not ahead of this block proof. -func (s *Sender) validateProof(ctx context.Context, proofWithHeader *producer.ProofWithHeader) (bool, error) { +func (s *Sender) ValidateProof( + ctx context.Context, + proofWithHeader *producer.ProofWithHeader, + latestVerifiedID *big.Int, +) (bool, error) { // 1. Check if the corresponding L1 block is still in the canonical chain. l1Header, err := s.rpc.L1.HeaderByNumber(ctx, proofWithHeader.Meta.GetRawBlockHeight()) if err != nil { @@ -140,18 +187,22 @@ func (s *Sender) validateProof(ctx context.Context, proofWithHeader *producer.Pr return false, nil } + var verifiedID = latestVerifiedID // 2. Check if latest verified head is ahead of this block proof. - stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) - if err != nil { - log.Warn( - "Failed to fetch state variables", - "blockID", proofWithHeader.BlockID, - "error", err, - ) - return false, err + if verifiedID == nil { + stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx}) + if err != nil { + log.Warn( + "Failed to fetch state variables", + "blockID", proofWithHeader.BlockID, + "error", err, + ) + return false, err + } + verifiedID = new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId) } - latestVerifiedID := stateVars.B.LastVerifiedBlockId - if new(big.Int).SetUint64(latestVerifiedID).Cmp(proofWithHeader.BlockID) >= 0 { + + if verifiedID.Cmp(proofWithHeader.BlockID) >= 0 { log.Info( "Block is already verified, skip current proof submission", "blockID", proofWithHeader.BlockID.Uint64(), diff --git a/packages/taiko-client/prover/prover.go b/packages/taiko-client/prover/prover.go index 3edc8f3636..237a7f4493 100644 --- a/packages/taiko-client/prover/prover.go +++ b/packages/taiko-client/prover/prover.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings" "github.com/taikoxyz/taiko-mono/packages/taiko-client/bindings/encoding" @@ -62,11 +63,13 @@ type Prover struct { assignmentExpiredCh chan metadata.TaikoBlockMetaData proveNotify chan struct{} + aggregationNotify chan uint16 // Proof related channels - proofSubmissionCh chan *proofProducer.ProofRequestBody - proofContestCh chan *proofProducer.ContestRequestBody - proofGenerationCh chan *proofProducer.ProofWithHeader + proofSubmissionCh chan *proofProducer.ProofRequestBody + proofContestCh chan *proofProducer.ContestRequestBody + proofGenerationCh chan *proofProducer.ProofWithHeader + batchProofGenerationCh chan *proofProducer.BatchProofs // Transactions manager txmgr *txmgr.SimpleTxManager @@ -130,10 +133,12 @@ func InitFromConfig( chBufferSize := p.protocolConfigs.BlockMaxProposals p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize) + p.batchProofGenerationCh = make(chan *proofProducer.BatchProofs, chBufferSize) p.assignmentExpiredCh = make(chan metadata.TaikoBlockMetaData, chBufferSize) p.proofSubmissionCh = make(chan *proofProducer.ProofRequestBody, p.cfg.Capacity) p.proofContestCh = make(chan *proofProducer.ContestRequestBody, p.cfg.Capacity) p.proveNotify = make(chan struct{}, 1) + p.aggregationNotify = make(chan uint16, 1) if err := p.initL1Current(cfg.StartingBlockID); err != nil { return fmt.Errorf("initialize L1 current cursor error: %w", err) @@ -274,6 +279,15 @@ func (p *Prover) eventLoop() { default: } } + // reqAggregation requests performing a aggregate operation, won't block + // if we are already aggregating. + reqAggregation := func() { + select { + // 0 means aggregating all tier proofs + case p.aggregationNotify <- 0: + default: + } + } // Call reqProving() right away to catch up with the latest state. reqProving() @@ -283,6 +297,9 @@ func (p *Prover) eventLoop() { forceProvingTicker := time.NewTicker(15 * time.Second) defer forceProvingTicker.Stop() + forceAggregatingTicker := time.NewTicker(p.cfg.ForceProveInterval) + defer forceAggregatingTicker.Stop() + // Channels chBufferSize := p.protocolConfigs.BlockMaxProposals blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize) @@ -321,12 +338,18 @@ func (p *Prover) eventLoop() { p.withRetry(func() error { return p.contestProofOp(req) }) case proofWithHeader := <-p.proofGenerationCh: p.withRetry(func() error { return p.submitProofOp(proofWithHeader) }) + case batchProof := <-p.batchProofGenerationCh: + p.withRetry(func() error { return p.submitProofAggregationOp(batchProof) }) case req := <-p.proofSubmissionCh: p.withRetry(func() error { return p.requestProofOp(req.Meta, req.Tier) }) case <-p.proveNotify: if err := p.proveOp(); err != nil { log.Error("Prove new blocks error", "error", err) } + case tier := <-p.aggregationNotify: + p.withRetry(func() error { + return p.aggregateOp(tier) + }) case e := <-blockVerifiedCh: p.blockVerifiedHandler.Handle(encoding.BlockVerifiedEventToV2(e)) case e := <-transitionProvedCh: @@ -366,6 +389,8 @@ func (p *Prover) eventLoop() { reqProving() case <-forceProvingTicker.C: reqProving() + case <-forceAggregatingTicker.C: + reqAggregation() } } } @@ -392,6 +417,35 @@ func (p *Prover) proveOp() error { return iter.Iter() } +// aggregateOp aggregates all proofs in buffer. +func (p *Prover) aggregateOp(tier uint16) error { + g, gCtx := errgroup.WithContext(p.ctx) + for _, submitter := range p.proofSubmitters { + g.Go(func() error { + if submitter.BufferSize() > 1 && + (tier == 0 || submitter.Tier() == tier) { + if err := submitter.AggregateProofs(gCtx); err != nil { + log.Error("Failed to aggregate proofs", + "error", err, + "tier", submitter.Tier(), + ) + return err + } + } else { + log.Debug( + "Skip the current aggregation operation", + "requestTier", tier, + "submitterTier", submitter.Tier(), + "bufferSize", submitter.BufferSize(), + ) + } + return nil + }) + } + + return g.Wait() +} + // contestProofOp performs a proof contest operation. func (p *Prover) contestProofOp(req *proofProducer.ContestRequestBody) error { if err := p.proofContester.SubmitContest( @@ -474,6 +528,36 @@ func (p *Prover) submitProofOp(proofWithHeader *proofProducer.ProofWithHeader) e return nil } +// submitProofsOp performs a batch proof submission operation. +func (p *Prover) submitProofAggregationOp(batchProof *proofProducer.BatchProofs) error { + submitter := p.getSubmitterByTier(batchProof.Tier) + if submitter == nil { + return nil + } + + if err := submitter.BatchSubmitProofs(p.ctx, batchProof); err != nil { + if strings.Contains(err.Error(), vm.ErrExecutionReverted.Error()) || + strings.Contains(err.Error(), proofSubmitter.ErrInvalidProof.Error()) { + log.Error( + "Proof submission reverted", + "blockIDs", batchProof.BlockIDs, + "tier", batchProof.Tier, + "error", err, + ) + return nil + } + log.Error( + "Submit proof error", + "blockIDs", batchProof.BlockIDs, + "tier", batchProof.Tier, + "error", err, + ) + return err + } + + return nil +} + // Name returns the application name. func (p *Prover) Name() string { return "prover" diff --git a/packages/taiko-client/prover/prover_test.go b/packages/taiko-client/prover/prover_test.go index 4ef524d751..638dc9b7ae 100644 --- a/packages/taiko-client/prover/prover_test.go +++ b/packages/taiko-client/prover/prover_test.go @@ -27,6 +27,7 @@ import ( "github.com/taikoxyz/taiko-mono/packages/taiko-client/proposer" guardianProverHeartbeater "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/guardian_prover_heartbeater" producer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" + proofSubmitter "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_submitter" "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_submitter/transaction" ) @@ -488,6 +489,119 @@ func (s *ProverTestSuite) TestGetBlockProofStatus() { s.Equal(proofWithHeader.Opts.BlockHash, common.BytesToHash(status.CurrentTransitionState.BlockHash[:])) } +func (s *ProverTestSuite) TestAggregateProofsAlreadyProved() { + batchSize := 2 + // Init batch prover + l1ProverPrivKey, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROVER_PRIVATE_KEY"))) + s.Nil(err) + decimal, err := s.RPCClient.TaikoToken.Decimals(nil) + s.Nil(err) + batchProver := new(Prover) + s.Nil(InitFromConfig(context.Background(), batchProver, &Config{ + L1WsEndpoint: os.Getenv("L1_WS"), + L2WsEndpoint: os.Getenv("L2_WS"), + L2HttpEndpoint: os.Getenv("L2_HTTP"), + TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1")), + TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2")), + TaikoTokenAddress: common.HexToAddress(os.Getenv("TAIKO_TOKEN")), + L1ProverPrivKey: l1ProverPrivKey, + Dummy: true, + ProveUnassignedBlocks: true, + Capacity: 1024, + Allowance: new(big.Int).Exp(big.NewInt(1_000_000_100), new(big.Int).SetUint64(uint64(decimal)), nil), + RPCTimeout: 3 * time.Second, + BackOffRetryInterval: 3 * time.Second, + BackOffMaxRetries: 12, + L1NodeVersion: "1.0.0", + L2NodeVersion: "0.1.0", + SGXProofBufferSize: uint64(batchSize), + }, s.txmgr, s.txmgr)) + + for i := 0; i < batchSize; i++ { + _ = s.ProposeAndInsertValidBlock(s.proposer, s.d.ChainSyncer().BlobSyncer()) + } + + sink2 := make(chan *bindings.TaikoL1ClientTransitionProvedV2, batchSize) + sub2, err := s.p.rpc.TaikoL1.WatchTransitionProvedV2(nil, sink2, nil) + s.Nil(err) + defer func() { + sub2.Unsubscribe() + close(sink2) + }() + + s.Nil(s.p.proveOp()) + s.Nil(batchProver.proveOp()) + for i := 0; i < batchSize; i++ { + req1 := <-s.p.proofSubmissionCh + s.Nil(s.p.requestProofOp(req1.Meta, req1.Tier)) + req2 := <-batchProver.proofSubmissionCh + s.Nil(batchProver.requestProofOp(req2.Meta, req2.Tier)) + s.Nil(s.p.selectSubmitter(req1.Tier).SubmitProof(context.Background(), <-s.p.proofGenerationCh)) + } + tier := <-batchProver.aggregationNotify + s.Nil(batchProver.aggregateOp(tier)) + s.ErrorIs( + batchProver.selectSubmitter(tier).BatchSubmitProofs(context.Background(), <-batchProver.batchProofGenerationCh), + proofSubmitter.ErrInvalidProof, + ) + for i := 0; i < batchSize; i++ { + <-sink2 + } +} + +func (s *ProverTestSuite) TestAggregateProofs() { + batchSize := 2 + // Init batch prover + l1ProverPrivKey, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROVER_PRIVATE_KEY"))) + s.Nil(err) + decimal, err := s.RPCClient.TaikoToken.Decimals(nil) + s.Nil(err) + batchProver := new(Prover) + s.Nil(InitFromConfig(context.Background(), batchProver, &Config{ + L1WsEndpoint: os.Getenv("L1_WS"), + L2WsEndpoint: os.Getenv("L2_WS"), + L2HttpEndpoint: os.Getenv("L2_HTTP"), + TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1")), + TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2")), + TaikoTokenAddress: common.HexToAddress(os.Getenv("TAIKO_TOKEN")), + L1ProverPrivKey: l1ProverPrivKey, + Dummy: true, + ProveUnassignedBlocks: true, + Capacity: 1024, + Allowance: new(big.Int).Exp(big.NewInt(1_000_000_100), new(big.Int).SetUint64(uint64(decimal)), nil), + RPCTimeout: 3 * time.Second, + BackOffRetryInterval: 3 * time.Second, + BackOffMaxRetries: 12, + L1NodeVersion: "1.0.0", + L2NodeVersion: "0.1.0", + SGXProofBufferSize: uint64(batchSize), + }, s.txmgr, s.txmgr)) + + for i := 0; i < batchSize; i++ { + _ = s.ProposeAndInsertValidBlock(s.proposer, s.d.ChainSyncer().BlobSyncer()) + } + + sink2 := make(chan *bindings.TaikoL1ClientTransitionProvedV2, batchSize) + sub2, err := s.p.rpc.TaikoL1.WatchTransitionProvedV2(nil, sink2, nil) + s.Nil(err) + defer func() { + sub2.Unsubscribe() + close(sink2) + }() + + s.Nil(batchProver.proveOp()) + for i := 0; i < batchSize; i++ { + req := <-batchProver.proofSubmissionCh + s.Nil(batchProver.requestProofOp(req.Meta, req.Tier)) + } + tier := <-batchProver.aggregationNotify + s.Nil(batchProver.aggregateOp(tier)) + s.Nil(batchProver.selectSubmitter(tier).BatchSubmitProofs(context.Background(), <-batchProver.batchProofGenerationCh)) + for i := 0; i < batchSize; i++ { + <-sink2 + } +} + func (s *ProverTestSuite) TestSetApprovalAlreadySetHigher() { originalAllowance, err := s.p.rpc.TaikoToken.Allowance(&bind.CallOpts{}, s.p.ProverAddress(), s.p.cfg.TaikoL1Address) s.Nil(err)