diff --git a/churner/churner_test.go b/churner/churner_test.go index 906d7f31a9..235ea489a0 100644 --- a/churner/churner_test.go +++ b/churner/churner_test.go @@ -24,6 +24,7 @@ func TestProcessChurnRequest(t *testing.T) { LoggerConfig: logging.DefaultCLIConfig(), EthClientConfig: geth.EthClientConfig{ PrivateKeyString: churnerPrivateKeyHex, + NumConfirmations: 0, }, } metrics := churner.NewMetrics("9001", logger) diff --git a/churner/tests/churner_test.go b/churner/tests/churner_test.go index 4c4925f554..c69dd749a4 100644 --- a/churner/tests/churner_test.go +++ b/churner/tests/churner_test.go @@ -150,6 +150,7 @@ func createTransactorFromScratch(privateKey, operatorStateRetriever, serviceMana ethClientCfg := geth.EthClientConfig{ RPCURL: rpcURL, PrivateKeyString: privateKey, + NumConfirmations: 0, } gethClient, err := geth.NewClient(ethClientCfg, logger) diff --git a/common/geth/cli.go b/common/geth/cli.go index a876d2bf14..4f5493185f 100644 --- a/common/geth/cli.go +++ b/common/geth/cli.go @@ -6,13 +6,15 @@ import ( ) var ( - rpcUrlFlagName = "chain.rpc" - privateKeyFlagName = "chain.private-key" + rpcUrlFlagName = "chain.rpc" + privateKeyFlagName = "chain.private-key" + numConfirmationsFlagName = "chain.num-confirmations" ) type EthClientConfig struct { RPCURL string PrivateKeyString string + NumConfirmations int } func EthClientFlags(envPrefix string) []cli.Flag { @@ -29,6 +31,13 @@ func EthClientFlags(envPrefix string) []cli.Flag { Required: true, EnvVar: common.PrefixEnvVar(envPrefix, "PRIVATE_KEY"), }, + cli.IntFlag{ + Name: numConfirmationsFlagName, + Usage: "Number of confirmations to wait for", + Required: false, + Value: 3, + EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"), + }, } } @@ -36,6 +45,7 @@ func ReadEthClientConfig(ctx *cli.Context) EthClientConfig { cfg := EthClientConfig{} cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName) cfg.PrivateKeyString = ctx.GlobalString(privateKeyFlagName) + cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName) return cfg } @@ -44,5 +54,6 @@ func ReadEthClientConfig(ctx *cli.Context) EthClientConfig { func ReadEthClientConfigRPCOnly(ctx *cli.Context) EthClientConfig { cfg := EthClientConfig{} cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName) + cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName) return cfg } diff --git a/common/geth/client.go b/common/geth/client.go index ac12241fe2..57467fd07a 100644 --- a/common/geth/client.go +++ b/common/geth/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/Layr-Labs/eigenda/common" "github.com/ethereum/go-ethereum" @@ -25,12 +26,13 @@ var ( type EthClient struct { *ethclient.Client - RPCURL string - privateKey *ecdsa.PrivateKey - chainID *big.Int - AccountAddress gethcommon.Address - Contracts map[gethcommon.Address]*bind.BoundContract - Logger common.Logger + RPCURL string + privateKey *ecdsa.PrivateKey + chainID *big.Int + AccountAddress gethcommon.Address + Contracts map[gethcommon.Address]*bind.BoundContract + Logger common.Logger + numConfirmations int } var _ common.EthClient = (*EthClient)(nil) @@ -64,13 +66,14 @@ func NewClient(config EthClientConfig, logger common.Logger) (*EthClient, error) } c := &EthClient{ - RPCURL: config.RPCURL, - privateKey: privateKey, - chainID: chainIDBigInt, - AccountAddress: accountAddress, - Client: chainClient, - Contracts: make(map[gethcommon.Address]*bind.BoundContract), - Logger: logger, + RPCURL: config.RPCURL, + privateKey: privateKey, + chainID: chainIDBigInt, + AccountAddress: accountAddress, + Client: chainClient, + Contracts: make(map[gethcommon.Address]*bind.BoundContract), + Logger: logger, + numConfirmations: config.NumConfirmations, } return c, err @@ -206,7 +209,7 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx( } func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) { - receipt, err := bind.WaitMined(ctx, c.Client, tx) + receipt, err := c.waitMined(ctx, tx) if err != nil { return nil, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err) } @@ -218,6 +221,43 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans return receipt, nil } +// waitMined waits for tx to be mined on the blockchain. +// Taken from https://github.com/ethereum/go-ethereum/blob/master/accounts/abi/bind/util.go#L32, +// but added a check for number of confirmations. +func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { + queryTicker := time.NewTicker(3 * time.Second) + defer queryTicker.Stop() + + for { + receipt, err := c.TransactionReceipt(ctx, tx.Hash()) + if err == nil { + chainTip, err := c.BlockNumber(ctx) + if err == nil { + if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip { + c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip) + } else { + return receipt, nil + } + } else { + c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err) + } + } + + if errors.Is(err, ethereum.NotFound) { + c.Logger.Trace("Transaction not yet mined") + } else { + c.Logger.Trace("Receipt retrieval failed", "err", err) + } + + // Wait for the next round. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-queryTicker.C: + } + } +} + // getGasFeeCap returns the gas fee cap for a transaction, calculated as: // gasFeeCap = 2 * baseFee + gasTipCap // Rationale: https://www.blocknative.com/blog/eip-1559-fees diff --git a/core/eth/tx.go b/core/eth/tx.go index e7559cb022..847d733920 100644 --- a/core/eth/tx.go +++ b/core/eth/tx.go @@ -408,9 +408,10 @@ func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums [] return state, nil } -// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds +// BuildConfirmBatchTxn builds a transaction to confirm a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds // specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail. -func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) { +// Note that this function returns a transaction without publishing it to the blockchain. The caller is responsible for publishing the transaction. +func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Transaction, error) { quorumNumbers := quorumParamsToQuorumNumbers(quorums) nonSignerOperatorIds := make([][32]byte, len(signatureAggregation.NonSigners)) for i := range signatureAggregation.NonSigners { @@ -470,9 +471,15 @@ func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHea t.Logger.Error("Failed to generate transact opts", "err", err) return nil, err } - tx, err := t.Bindings.EigenDAServiceManager.ConfirmBatch(opts, batchH, signatureChecker) + return t.Bindings.EigenDAServiceManager.ConfirmBatch(opts, batchH, signatureChecker) +} + +// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds +// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail. +func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) { + tx, err := t.BuildConfirmBatchTxn(ctx, batchHeader, quorums, signatureAggregation) if err != nil { - t.Logger.Error("Failed to confirm batch", "err", err) + t.Logger.Error("Failed to build a ConfirmBatch txn", "err", err) return nil, err } diff --git a/core/indexer/state_test.go b/core/indexer/state_test.go index 2534bdb431..71df476414 100644 --- a/core/indexer/state_test.go +++ b/core/indexer/state_test.go @@ -67,6 +67,7 @@ func mustMakeOperatorTransactor(env *deploy.Config, op deploy.OperatorVars, logg config := geth.EthClientConfig{ RPCURL: deployer.RPC, PrivateKeyString: op.NODE_PRIVATE_KEY, + NumConfirmations: 0, } c, err := geth.NewClient(config, logger) @@ -86,6 +87,7 @@ func mustMakeTestClients(env *deploy.Config, privateKey string, logger common.Lo config := geth.EthClientConfig{ RPCURL: deployer.RPC, PrivateKeyString: privateKey, + NumConfirmations: 0, } client, err := geth.NewClient(config, logger) diff --git a/core/thegraph/state_integration_test.go b/core/thegraph/state_integration_test.go index 146df6ef95..d44462a346 100644 --- a/core/thegraph/state_integration_test.go +++ b/core/thegraph/state_integration_test.go @@ -111,6 +111,7 @@ func mustMakeTestClient(t *testing.T, env *deploy.Config, privateKey string, log config := geth.EthClientConfig{ RPCURL: deployer.RPC, PrivateKeyString: privateKey, + NumConfirmations: 0, } client, err := geth.NewClient(config, logger) diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 332a6cce3c..9194da6ddc 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -171,6 +171,7 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, DISPERSER_SERVER_METRICS_HTTP_PORT: "9093", DISPERSER_SERVER_CHAIN_RPC: "", DISPERSER_SERVER_PRIVATE_KEY: "123", + DISPERSER_SERVER_NUM_CONFIRMATIONS: "0", DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0", DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT: "10000000", @@ -217,6 +218,7 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B BATCHER_AWS_ENDPOINT_URL: "", BATCHER_FINALIZER_INTERVAL: "6m", BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", + BATCHER_NUM_CONFIRMATIONS: "0", } env.applyDefaults(&v, "BATCHER", "batcher", ind) @@ -301,6 +303,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_NUM_BATCH_VALIDATORS: "128", NODE_PUBLIC_IP_PROVIDER: "mockip", NODE_PUBLIC_IP_CHECK_INTERVAL: "10s", + NODE_NUM_CONFIRMATIONS: "0", } env.applyDefaults(&v, "NODE", "opr", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 2a6ef4590a..c6d888431c 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -27,6 +27,8 @@ type DisperserVars struct { DISPERSER_SERVER_PRIVATE_KEY string + DISPERSER_SERVER_NUM_CONFIRMATIONS string + DISPERSER_SERVER_STD_LOG_LEVEL string DISPERSER_SERVER_FILE_LOG_LEVEL string @@ -114,6 +116,8 @@ type BatcherVars struct { BATCHER_PRIVATE_KEY string + BATCHER_NUM_CONFIRMATIONS string + BATCHER_STD_LOG_LEVEL string BATCHER_FILE_LOG_LEVEL string @@ -262,6 +266,8 @@ type OperatorVars struct { NODE_PRIVATE_KEY string + NODE_NUM_CONFIRMATIONS string + NODE_STD_LOG_LEVEL string NODE_FILE_LOG_LEVEL string @@ -315,6 +321,8 @@ type RetrieverVars struct { RETRIEVER_PRIVATE_KEY string + RETRIEVER_NUM_CONFIRMATIONS string + RETRIEVER_STD_LOG_LEVEL string RETRIEVER_FILE_LOG_LEVEL string @@ -344,12 +352,18 @@ type ChurnerVars struct { CHURNER_EIGENDA_SERVICE_MANAGER string + CHURNER_ENABLE_METRICS string + CHURNER_PER_PUBLIC_KEY_RATE_LIMIT string + CHURNER_METRICS_HTTP_PORT string + CHURNER_CHAIN_RPC string CHURNER_PRIVATE_KEY string + CHURNER_NUM_CONFIRMATIONS string + CHURNER_STD_LOG_LEVEL string CHURNER_FILE_LOG_LEVEL string @@ -357,10 +371,6 @@ type ChurnerVars struct { CHURNER_LOG_PATH string CHURNER_INDEXER_PULL_INTERVAL string - - CHURNER_ENABLE_METRICS string - - CHURNER_METRICS_HTTP_PORT string } func (vars ChurnerVars) getEnvMap() map[string]string { diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index 961504eb59..c56027dcbd 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -43,8 +43,10 @@ var ( bucketTableName = "test-BucketStore" logger common.Logger ethClient common.EthClient + rpcClient common.RPCEthClient mockRollup *rollupbindings.ContractMockRollup retrievalClient clients.RetrievalClient + numConfirmations int = 3 ) func init() { @@ -115,8 +117,11 @@ var _ = BeforeSuite(func() { ethClient, err = geth.NewClient(geth.EthClientConfig{ RPCURL: testConfig.Deployers[0].RPC, PrivateKeyString: pk, + NumConfirmations: numConfirmations, }, logger) Expect(err).To(BeNil()) + rpcClient, err = ethrpc.Dial(testConfig.Deployers[0].RPC) + Expect(err).To(BeNil()) mockRollup, err = rollupbindings.NewContractMockRollup(gcommon.HexToAddress(testConfig.MockRollup), ethClient) Expect(err).To(BeNil()) err = setupRetrievalClient(testConfig) @@ -127,6 +132,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error { ethClientConfig := geth.EthClientConfig{ RPCURL: testConfig.Deployers[0].RPC, PrivateKeyString: "351b8eca372e64f64d514f90f223c5c4f86a04ff3dcead5c27293c547daab4ca", // just random private key + NumConfirmations: numConfirmations, } client, err := geth.NewClient(ethClientConfig, logger) if err != nil { diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index a07d1fa09c..275f92e48d 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -17,6 +17,13 @@ import ( . "github.com/onsi/gomega" ) +func mineAnvilBlocks(numBlocks int) { + for i := 0; i < numBlocks; i++ { + err := rpcClient.CallContext(context.Background(), nil, "evm_mine") + Expect(err).To(BeNil()) + } +} + var _ = Describe("Inabox Integration", func() { It("test end to end scenario", func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -27,7 +34,12 @@ var _ = Describe("Inabox Integration", func() { optsWithValue.Value = big.NewInt(1e18) tx, err := mockRollup.RegisterValidator(optsWithValue) Expect(err).To(BeNil()) - _, err = ethClient.EstimateGasPriceAndLimitAndSendTx(ctx, tx, "RegisterValidator", big.NewInt(1e18)) + tx, err = ethClient.UpdateGas(ctx, tx, optsWithValue.Value) + Expect(err).To(BeNil()) + err = ethClient.SendTransaction(ctx, tx) + Expect(err).To(BeNil()) + mineAnvilBlocks(numConfirmations + 1) + _, err = ethClient.EnsureTransactionEvaled(ctx, tx, "RegisterValidator") Expect(err).To(BeNil()) disp := traffic.NewDisperserClient(&traffic.Config{ @@ -79,9 +91,16 @@ var _ = Describe("Inabox Integration", func() { Expect(err).To(BeNil()) tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof) Expect(err).To(BeNil()) - _, err = ethClient.EstimateGasPriceAndLimitAndSendTx(ctx, tx, "PostCommitment", nil) + tx, err = ethClient.UpdateGas(ctx, tx, nil) + Expect(err).To(BeNil()) + err = ethClient.SendTransaction(ctx, tx) + Expect(err).To(BeNil()) + mineAnvilBlocks(numConfirmations + 1) + _, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment") Expect(err).To(BeNil()) break loop + } else { + mineAnvilBlocks(numConfirmations + 1) } } } @@ -97,7 +116,7 @@ var _ = Describe("Inabox Integration", func() { 0, ) Expect(err).To(BeNil()) - Expect(bytes.TrimRight(retrieved, "\x00")).To(Equal(data)) + Expect(bytes.TrimRight(retrieved, "\x00")).To(Equal(bytes.TrimRight(data, "\x00"))) }) }) diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index 0213477071..70ab230fd5 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -47,6 +47,7 @@ func main() { plugin.BlsOperatorStateRetrieverFlag, plugin.EigenDAServiceManagerFlag, plugin.ChurnerUrlFlag, + plugin.NumConfirmationsFlag, } app.Name = "eigenda-node-plugin" app.Usage = "EigenDA Node Plugin" @@ -98,6 +99,7 @@ func pluginOps(ctx *cli.Context) { ethConfig := geth.EthClientConfig{ RPCURL: config.ChainRpcUrl, PrivateKeyString: *privateKey, + NumConfirmations: config.NumConfirmations, } client, err := geth.NewClient(ethConfig, logger) if err != nil { diff --git a/node/plugin/config.go b/node/plugin/config.go index 3e3582514c..a4d41af7ff 100644 --- a/node/plugin/config.go +++ b/node/plugin/config.go @@ -89,6 +89,13 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(flags.EnvVarPrefix, "CHURNER_URL"), } + NumConfirmationsFlag = cli.IntFlag{ + Name: "num-confirmations", + Usage: "Number of confirmations to wait for", + Required: false, + Value: 3, + EnvVar: common.PrefixEnvVar(flags.EnvVarPrefix, "NUM_CONFIRMATIONS"), + } ) type Config struct { @@ -103,6 +110,7 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string ChurnerUrl string + NumConfirmations int } func NewConfig(ctx *cli.Context) (*Config, error) { @@ -133,5 +141,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { BLSOperatorStateRetrieverAddr: ctx.GlobalString(BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(EigenDAServiceManagerFlag.Name), ChurnerUrl: ctx.GlobalString(ChurnerUrlFlag.Name), + NumConfirmations: ctx.GlobalInt(NumConfirmationsFlag.Name), }, nil }