Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N] Batch transactor: wait for n confirmations for a transaction #99

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestProcessChurnRequest(t *testing.T) {
LoggerConfig: logging.DefaultCLIConfig(),
EthClientConfig: geth.EthClientConfig{
PrivateKeyString: churnerPrivateKeyHex,
NumConfirmations: 0,
},
}
metrics := churner.NewMetrics("9001", logger)
Expand Down
1 change: 1 addition & 0 deletions churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func createTransactorFromScratch(privateKey, operatorStateRetriever, serviceMana
ethClientCfg := geth.EthClientConfig{
RPCURL: rpcURL,
PrivateKeyString: privateKey,
NumConfirmations: 0,
}

gethClient, err := geth.NewClient(ethClientCfg, logger)
Expand Down
15 changes: 13 additions & 2 deletions common/geth/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
)

var (
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
numConfirmationsFlagName = "chain.num-confirmations"
)

type EthClientConfig struct {
RPCURL string
PrivateKeyString string
NumConfirmations int
}

func EthClientFlags(envPrefix string) []cli.Flag {
Expand All @@ -29,13 +31,21 @@ func EthClientFlags(envPrefix string) []cli.Flag {
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "PRIVATE_KEY"),
},
cli.IntFlag{
Name: numConfirmationsFlagName,
Usage: "Number of confirmations to wait for",
Required: false,
Value: 3,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"),
},
}
}

func ReadEthClientConfig(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.PrivateKeyString = ctx.GlobalString(privateKeyFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
return cfg
}

Expand All @@ -44,5 +54,6 @@ func ReadEthClientConfig(ctx *cli.Context) EthClientConfig {
func ReadEthClientConfigRPCOnly(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
return cfg
}
68 changes: 54 additions & 14 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/ethereum/go-ethereum"
Expand All @@ -25,12 +26,13 @@ var (

type EthClient struct {
*ethclient.Client
RPCURL string
privateKey *ecdsa.PrivateKey
chainID *big.Int
AccountAddress gethcommon.Address
Contracts map[gethcommon.Address]*bind.BoundContract
Logger common.Logger
RPCURL string
privateKey *ecdsa.PrivateKey
chainID *big.Int
AccountAddress gethcommon.Address
Contracts map[gethcommon.Address]*bind.BoundContract
Logger common.Logger
numConfirmations int
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is this is generally a global setting or would it be something that would vary between different types of calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be something each component sets globally and hidden from callers for now, although i can see there may be a need to override for different types of calls in the future if we have a lot more different types of calls.


var _ common.EthClient = (*EthClient)(nil)
Expand Down Expand Up @@ -64,13 +66,14 @@ func NewClient(config EthClientConfig, logger common.Logger) (*EthClient, error)
}

c := &EthClient{
RPCURL: config.RPCURL,
privateKey: privateKey,
chainID: chainIDBigInt,
AccountAddress: accountAddress,
Client: chainClient,
Contracts: make(map[gethcommon.Address]*bind.BoundContract),
Logger: logger,
RPCURL: config.RPCURL,
privateKey: privateKey,
chainID: chainIDBigInt,
AccountAddress: accountAddress,
Client: chainClient,
Contracts: make(map[gethcommon.Address]*bind.BoundContract),
Logger: logger,
numConfirmations: config.NumConfirmations,
}

return c, err
Expand Down Expand Up @@ -206,7 +209,7 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx(
}

func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := bind.WaitMined(ctx, c.Client, tx)
receipt, err := c.waitMined(ctx, tx)
if err != nil {
return nil, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
Expand All @@ -218,6 +221,43 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
return receipt, nil
}

// waitMined waits for tx to be mined on the blockchain.
// Taken from https://github.com/ethereum/go-ethereum/blob/master/accounts/abi/bind/util.go#L32,
// but added a check for number of confirmations.
func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()

for {
receipt, err := c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
chainTip, err := c.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Trace("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
} else {
return receipt, nil
}
} else {
c.Logger.Trace("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined")
} else {
c.Logger.Trace("Receipt retrieval failed", "err", err)
}

// Wait for the next round.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-queryTicker.C:
}
}
}

// getGasFeeCap returns the gas fee cap for a transaction, calculated as:
// gasFeeCap = 2 * baseFee + gasTipCap
// Rationale: https://www.blocknative.com/blog/eip-1559-fees
Expand Down
15 changes: 11 additions & 4 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,10 @@ func (t *Transactor) GetOperatorStakesForQuorums(ctx context.Context, quorums []
return state, nil
}

// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds
// BuildConfirmBatchTxn builds a transaction to confirm a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds
// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail.
func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) {
// Note that this function returns a transaction without publishing it to the blockchain. The caller is responsible for publishing the transaction.
func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Transaction, error) {
quorumNumbers := quorumParamsToQuorumNumbers(quorums)
nonSignerOperatorIds := make([][32]byte, len(signatureAggregation.NonSigners))
for i := range signatureAggregation.NonSigners {
Expand Down Expand Up @@ -470,9 +471,15 @@ func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHea
t.Logger.Error("Failed to generate transact opts", "err", err)
return nil, err
}
tx, err := t.Bindings.EigenDAServiceManager.ConfirmBatch(opts, batchH, signatureChecker)
return t.Bindings.EigenDAServiceManager.ConfirmBatch(opts, batchH, signatureChecker)
}

// ConfirmBatch confirms a batch header and signature aggregation. The signature aggregation must satisfy the quorum thresholds
// specified in the batch header. If the signature aggregation does not satisfy the quorum thresholds, the transaction will fail.
func (t *Transactor) ConfirmBatch(ctx context.Context, batchHeader core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation core.SignatureAggregation) (*types.Receipt, error) {
tx, err := t.BuildConfirmBatchTxn(ctx, batchHeader, quorums, signatureAggregation)
if err != nil {
t.Logger.Error("Failed to confirm batch", "err", err)
t.Logger.Error("Failed to build a ConfirmBatch txn", "err", err)
return nil, err
}

Expand Down
2 changes: 2 additions & 0 deletions core/indexer/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func mustMakeOperatorTransactor(env *deploy.Config, op deploy.OperatorVars, logg
config := geth.EthClientConfig{
RPCURL: deployer.RPC,
PrivateKeyString: op.NODE_PRIVATE_KEY,
NumConfirmations: 0,
}

c, err := geth.NewClient(config, logger)
Expand All @@ -86,6 +87,7 @@ func mustMakeTestClients(env *deploy.Config, privateKey string, logger common.Lo
config := geth.EthClientConfig{
RPCURL: deployer.RPC,
PrivateKeyString: privateKey,
NumConfirmations: 0,
}

client, err := geth.NewClient(config, logger)
Expand Down
1 change: 1 addition & 0 deletions core/thegraph/state_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func mustMakeTestClient(t *testing.T, env *deploy.Config, privateKey string, log
config := geth.EthClientConfig{
RPCURL: deployer.RPC,
PrivateKeyString: privateKey,
NumConfirmations: 0,
}

client, err := geth.NewClient(config, logger)
Expand Down
3 changes: 3 additions & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath,
DISPERSER_SERVER_METRICS_HTTP_PORT: "9093",
DISPERSER_SERVER_CHAIN_RPC: "",
DISPERSER_SERVER_PRIVATE_KEY: "123",
DISPERSER_SERVER_NUM_CONFIRMATIONS: "0",

DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0",
DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT: "10000000",
Expand Down Expand Up @@ -217,6 +218,7 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B
BATCHER_AWS_ENDPOINT_URL: "",
BATCHER_FINALIZER_INTERVAL: "6m",
BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500",
BATCHER_NUM_CONFIRMATIONS: "0",
}

env.applyDefaults(&v, "BATCHER", "batcher", ind)
Expand Down Expand Up @@ -301,6 +303,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath,
NODE_NUM_BATCH_VALIDATORS: "128",
NODE_PUBLIC_IP_PROVIDER: "mockip",
NODE_PUBLIC_IP_CHECK_INTERVAL: "10s",
NODE_NUM_CONFIRMATIONS: "0",
}

env.applyDefaults(&v, "NODE", "opr", ind)
Expand Down
18 changes: 14 additions & 4 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type DisperserVars struct {

DISPERSER_SERVER_PRIVATE_KEY string

DISPERSER_SERVER_NUM_CONFIRMATIONS string

DISPERSER_SERVER_STD_LOG_LEVEL string

DISPERSER_SERVER_FILE_LOG_LEVEL string
Expand Down Expand Up @@ -114,6 +116,8 @@ type BatcherVars struct {

BATCHER_PRIVATE_KEY string

BATCHER_NUM_CONFIRMATIONS string

BATCHER_STD_LOG_LEVEL string

BATCHER_FILE_LOG_LEVEL string
Expand Down Expand Up @@ -262,6 +266,8 @@ type OperatorVars struct {

NODE_PRIVATE_KEY string

NODE_NUM_CONFIRMATIONS string

NODE_STD_LOG_LEVEL string

NODE_FILE_LOG_LEVEL string
Expand Down Expand Up @@ -315,6 +321,8 @@ type RetrieverVars struct {

RETRIEVER_PRIVATE_KEY string

RETRIEVER_NUM_CONFIRMATIONS string

RETRIEVER_STD_LOG_LEVEL string

RETRIEVER_FILE_LOG_LEVEL string
Expand Down Expand Up @@ -344,23 +352,25 @@ type ChurnerVars struct {

CHURNER_EIGENDA_SERVICE_MANAGER string

CHURNER_ENABLE_METRICS string

CHURNER_PER_PUBLIC_KEY_RATE_LIMIT string

CHURNER_METRICS_HTTP_PORT string

CHURNER_CHAIN_RPC string

CHURNER_PRIVATE_KEY string

CHURNER_NUM_CONFIRMATIONS string

CHURNER_STD_LOG_LEVEL string

CHURNER_FILE_LOG_LEVEL string

CHURNER_LOG_PATH string

CHURNER_INDEXER_PULL_INTERVAL string

CHURNER_ENABLE_METRICS string

CHURNER_METRICS_HTTP_PORT string
}

func (vars ChurnerVars) getEnvMap() map[string]string {
Expand Down
6 changes: 6 additions & 0 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ var (
bucketTableName = "test-BucketStore"
logger common.Logger
ethClient common.EthClient
rpcClient common.RPCEthClient
mockRollup *rollupbindings.ContractMockRollup
retrievalClient clients.RetrievalClient
numConfirmations int = 3
)

func init() {
Expand Down Expand Up @@ -115,8 +117,11 @@ var _ = BeforeSuite(func() {
ethClient, err = geth.NewClient(geth.EthClientConfig{
RPCURL: testConfig.Deployers[0].RPC,
PrivateKeyString: pk,
NumConfirmations: numConfirmations,
}, logger)
Expect(err).To(BeNil())
rpcClient, err = ethrpc.Dial(testConfig.Deployers[0].RPC)
Expect(err).To(BeNil())
mockRollup, err = rollupbindings.NewContractMockRollup(gcommon.HexToAddress(testConfig.MockRollup), ethClient)
Expect(err).To(BeNil())
err = setupRetrievalClient(testConfig)
Expand All @@ -127,6 +132,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
ethClientConfig := geth.EthClientConfig{
RPCURL: testConfig.Deployers[0].RPC,
PrivateKeyString: "351b8eca372e64f64d514f90f223c5c4f86a04ff3dcead5c27293c547daab4ca", // just random private key
NumConfirmations: numConfirmations,
}
client, err := geth.NewClient(ethClientConfig, logger)
if err != nil {
Expand Down
25 changes: 22 additions & 3 deletions inabox/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
. "github.com/onsi/gomega"
)

func mineAnvilBlocks(numBlocks int) {
for i := 0; i < numBlocks; i++ {
err := rpcClient.CallContext(context.Background(), nil, "evm_mine")
Expect(err).To(BeNil())
}
}

var _ = Describe("Inabox Integration", func() {
It("test end to end scenario", func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
Expand All @@ -27,7 +34,12 @@ var _ = Describe("Inabox Integration", func() {
optsWithValue.Value = big.NewInt(1e18)
tx, err := mockRollup.RegisterValidator(optsWithValue)
Expect(err).To(BeNil())
_, err = ethClient.EstimateGasPriceAndLimitAndSendTx(ctx, tx, "RegisterValidator", big.NewInt(1e18))
tx, err = ethClient.UpdateGas(ctx, tx, optsWithValue.Value)
Expect(err).To(BeNil())
err = ethClient.SendTransaction(ctx, tx)
Expect(err).To(BeNil())
mineAnvilBlocks(numConfirmations + 1)
_, err = ethClient.EnsureTransactionEvaled(ctx, tx, "RegisterValidator")
Expect(err).To(BeNil())

disp := traffic.NewDisperserClient(&traffic.Config{
Expand Down Expand Up @@ -79,9 +91,16 @@ var _ = Describe("Inabox Integration", func() {
Expect(err).To(BeNil())
tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof)
Expect(err).To(BeNil())
_, err = ethClient.EstimateGasPriceAndLimitAndSendTx(ctx, tx, "PostCommitment", nil)
tx, err = ethClient.UpdateGas(ctx, tx, nil)
Expect(err).To(BeNil())
err = ethClient.SendTransaction(ctx, tx)
Expect(err).To(BeNil())
mineAnvilBlocks(numConfirmations + 1)
_, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment")
Expect(err).To(BeNil())
break loop
} else {
mineAnvilBlocks(numConfirmations + 1)
}
}
}
Expand All @@ -97,7 +116,7 @@ var _ = Describe("Inabox Integration", func() {
0,
)
Expect(err).To(BeNil())
Expect(bytes.TrimRight(retrieved, "\x00")).To(Equal(data))
Expect(bytes.TrimRight(retrieved, "\x00")).To(Equal(bytes.TrimRight(data, "\x00")))
})
})

Expand Down
Loading
Loading