diff --git a/ccip/config/evm/fallback.toml b/ccip/config/evm/fallback.toml deleted file mode 100644 index c1f963a33ff..00000000000 --- a/ccip/config/evm/fallback.toml +++ /dev/null @@ -1,95 +0,0 @@ -AutoCreateKey = true -BlockBackfillDepth = 10 -BlockBackfillSkip = false -FinalityDepth = 50 -FinalityTagEnabled = false -LogBackfillBatchSize = 1000 -LogPollInterval = '15s' -LogKeepBlocksDepth = 100000 -# CCIP uses paging when removing logs to avoid pushing too much pressure on the database -LogPrunePageSize = 10000 -BackupLogPollerBlockDelay = 100 -MinContractPayment = '.00001 link' -MinIncomingConfirmations = 3 -NonceAutoSync = true -NoNewHeadsThreshold = '3m' -RPCDefaultBatchSize = 250 -RPCBlockQueryDelay = 1 -FinalizedBlockOffset = 0 -NoNewFinalizedHeadsThreshold = '0' -LogBroadcasterEnabled = true - -[Transactions] -ForwardersEnabled = false -MaxInFlight = 16 -MaxQueued = 250 -ReaperInterval = '1h' -ReaperThreshold = '168h' -ResendAfterThreshold = '1m' - -[Transactions.AutoPurge] -Enabled = false - -[BalanceMonitor] -Enabled = true - -[GasEstimator] -Mode = 'BlockHistory' -PriceDefault = '20 gwei' -PriceMax = '115792089237316195423570985008687907853269984665.640564039457584007913129639935 tether' -PriceMin = '1 gwei' -LimitDefault = 8_000_000 -LimitMax = 8_000_000 -LimitMultiplier = '1' -LimitTransfer = 21_000 -BumpMin = '5 gwei' -BumpPercent = 20 -BumpThreshold = 3 -EIP1559DynamicFees = false -FeeCapDefault = '100 gwei' -TipCapDefault = '1' -TipCapMin = '1' -EstimateLimit = false - -[GasEstimator.BlockHistory] -BatchSize = 25 -BlockHistorySize = 8 -CheckInclusionBlocks = 12 -CheckInclusionPercentile = 90 -TransactionPercentile = 60 - -[GasEstimator.FeeHistory] -CacheTimeout = '10s' - -[HeadTracker] -HistoryDepth = 100 -MaxBufferSize = 3 -SamplingInterval = '1s' -FinalityTagBypass = true -MaxAllowedFinalityDepth = 10000 - -[NodePool] -PollFailureThreshold = 5 -PollInterval = '10s' -SelectionMode = 'HighestHead' -SyncThreshold = 5 -LeaseDuration = '0s' -NodeIsSyncingEnabled = false -FinalizedBlockPollInterval = '5s' -EnforceRepeatableRead = false -DeathDeclarationDelay = '10s' -NewHeadsPollInterval = '0s' - -[OCR] -ContractConfirmations = 4 -ContractTransmitterTransmitTimeout = '10s' -DatabaseTimeout = '10s' -DeltaCOverride = '168h' -DeltaCJitterOverride = '1h' -ObservationGracePeriod = '1s' - -[OCR2.Automation] -GasLimit = 5400000 - -[Workflow] -GasLimitDefault = 400_000 diff --git a/deployment/ccip/changeset/cs_ccip_home.go b/deployment/ccip/changeset/cs_ccip_home.go index 7d3327a31f2..bbd7028a8f3 100644 --- a/deployment/ccip/changeset/cs_ccip_home.go +++ b/deployment/ccip/changeset/cs_ccip_home.go @@ -303,8 +303,10 @@ func (s SetCandidateConfigBase) Validate(e deployment.Environment, state CCIPOnC if !exists { return fmt.Errorf("home chain %d does not exist", s.HomeChainSelector) } - if err := commoncs.ValidateOwnership(e.GetContext(), s.MCMS != nil, e.Chains[s.HomeChainSelector].DeployerKey.From, homeChainState.Timelock.Address(), homeChainState.CapabilityRegistry); err != nil { - return err + if s.MCMS != nil { + if err := commoncs.ValidateOwnership(e.GetContext(), s.MCMS != nil, e.Chains[s.HomeChainSelector].DeployerKey.From, homeChainState.Timelock.Address(), homeChainState.CapabilityRegistry); err != nil { + return err + } } for chainSelector, params := range s.OCRConfigPerRemoteChainSelector { @@ -1024,8 +1026,10 @@ func (c UpdateChainConfigConfig) Validate(e deployment.Environment) error { if !exists { return fmt.Errorf("home chain %d does not exist", c.HomeChainSelector) } - if err := commoncs.ValidateOwnership(e.GetContext(), c.MCMS != nil, e.Chains[c.HomeChainSelector].DeployerKey.From, homeChainState.Timelock.Address(), homeChainState.CCIPHome); err != nil { - return err + if c.MCMS != nil { + if err := commoncs.ValidateOwnership(e.GetContext(), c.MCMS != nil, e.Chains[c.HomeChainSelector].DeployerKey.From, homeChainState.Timelock.Address(), homeChainState.CCIPHome); err != nil { + return err + } } for _, remove := range c.RemoteChainRemoves { if err := deployment.IsValidChainSelector(remove); err != nil { @@ -1103,7 +1107,6 @@ func UpdateChainConfig(e deployment.Environment, cfg UpdateChainConfigConfig) (d if err != nil { return deployment.ChangesetOutput{}, err } - e.Logger.Infof("Updated chain config on chain %d removes %v, adds %v", cfg.HomeChainSelector, cfg.RemoteChainRemoves, cfg.RemoteChainAdds) return deployment.ChangesetOutput{}, nil } diff --git a/deployment/ccip/changeset/cs_prerequisites.go b/deployment/ccip/changeset/cs_prerequisites.go index 2736ecf44bf..f09e7ca24e9 100644 --- a/deployment/ccip/changeset/cs_prerequisites.go +++ b/deployment/ccip/changeset/cs_prerequisites.go @@ -306,7 +306,6 @@ func deployPrerequisiteContracts(e deployment.Environment, ab deployment.Address e.Logger.Errorw("Failed to confirm assign registry module on token admin registry", "chain", chain.String(), "err", err) return fmt.Errorf("failed to confirm assign registry module on token admin registry: %w", err) } - e.Logger.Infow("assigned registry module on token admin registry") } if weth9Contract == nil { weth, err := deployment.DeployContract(lggr, chain, ab, diff --git a/deployment/ccip/changeset/cs_update_rmn_config.go b/deployment/ccip/changeset/cs_update_rmn_config.go index 309f10c8311..155ad43a733 100644 --- a/deployment/ccip/changeset/cs_update_rmn_config.go +++ b/deployment/ccip/changeset/cs_update_rmn_config.go @@ -46,6 +46,7 @@ func (c SetRMNRemoteOnRMNProxyConfig) Validate(state CCIPOnChainState) error { } func SetRMNRemoteOnRMNProxy(e deployment.Environment, cfg SetRMNRemoteOnRMNProxyConfig) (deployment.ChangesetOutput, error) { + e.Logger.Infow("Setting RMNRemote on RMNProxy", "chainSelectors", cfg.ChainSelectors) state, err := LoadOnchainState(e) if err != nil { return deployment.ChangesetOutput{}, fmt.Errorf("failed to load onchain state: %w", err) diff --git a/deployment/ccip/changeset/state.go b/deployment/ccip/changeset/state.go index aa07168a6d2..0be6a1b3ed9 100644 --- a/deployment/ccip/changeset/state.go +++ b/deployment/ccip/changeset/state.go @@ -445,6 +445,8 @@ func LoadChainState(chain deployment.Chain, addresses map[string]deployment.Type return state, err } state.OffRamp = offRamp + case deployment.NewTypeAndVersion(ARMProxy, deployment.Version1_6_0_dev).String(): + fallthrough case deployment.NewTypeAndVersion(ARMProxy, deployment.Version1_0_0).String(): armProxy, err := rmn_proxy_contract.NewRMNProxy(common.HexToAddress(address), chain.Client) if err != nil { diff --git a/deployment/environment/crib/ccip_deployer.go b/deployment/environment/crib/ccip_deployer.go index 639e42b4024..e7668c50082 100644 --- a/deployment/environment/crib/ccip_deployer.go +++ b/deployment/environment/crib/ccip_deployer.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" + "github.com/smartcontractkit/chainlink-ccip/chainconfig" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types" "math/big" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/ccip-owner-contracts/pkg/config" - "github.com/smartcontractkit/chainlink-ccip/chainconfig" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" "github.com/smartcontractkit/chainlink/deployment" "github.com/smartcontractkit/chainlink/deployment/ccip/changeset" @@ -71,7 +72,7 @@ func DeployHomeChainContracts(ctx context.Context, lggr logger.Logger, envConfig // DeployCCIPAndAddLanes is the actual ccip setup once the nodes are initialized. func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig devenv.EnvironmentConfig, homeChainSel, feedChainSel uint64, ab deployment.AddressBook) (DeployCCIPOutput, error) { - e, _, err := devenv.NewEnvironment(func() context.Context { return ctx }, lggr, envConfig) + e, don, err := devenv.NewEnvironment(func() context.Context { return ctx }, lggr, envConfig) if err != nil { return DeployCCIPOutput{}, fmt.Errorf("failed to initiate new environment: %w", err) } @@ -95,6 +96,7 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de }) } + // todo: parallelize these step across all chains // set up chains chainConfigs := make(map[uint64]changeset.ChainConfig) nodeInfo, err := deployment.NodeInfo(e.NodeIDs, e.Offchain) @@ -143,6 +145,12 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de HomeChainSelector: homeChainSel, }, }, + { + Changeset: commonchangeset.WrapChangeSet(changeset.SetRMNRemoteOnRMNProxy), + Config: changeset.SetRMNRemoteOnRMNProxyConfig{ + ChainSelectors: chainSelectors, + }, + }, { Changeset: commonchangeset.WrapChangeSet(changeset.CCIPCapabilityJobspec), Config: struct{}{}, @@ -152,20 +160,85 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de if err != nil { return DeployCCIPOutput{}, fmt.Errorf("failed to load onchain state: %w", err) } + + var ocrConfigPerSelector = make(map[uint64]changeset.CCIPOCRParams) + for selector := range e.Chains { + //tokenConfig := changeset.NewTestTokenConfig(state.Chains[feedChainSel].USDFeeds) + ocrConfigPerSelector[selector] = changeset.DefaultOCRParams( + selector, + //tokenConfig.GetTokenInfo(lggr, state.Chains[selector].LinkToken, state.Chains[selector].Weth9), + nil, + nil, + ) + } + + *e, err = commonchangeset.ApplyChangesets(nil, *e, nil, []commonchangeset.ChangesetApplication{ + { + // Add the DONs and candidate commit OCR instances for the chain. + Changeset: commonchangeset.WrapChangeSet(changeset.AddDonAndSetCandidateChangeset), + Config: changeset.AddDonAndSetCandidateChangesetConfig{ + changeset.SetCandidateConfigBase{ + HomeChainSelector: homeChainSel, + FeedChainSelector: feedChainSel, + OCRConfigPerRemoteChainSelector: ocrConfigPerSelector, + PluginType: types.PluginTypeCCIPCommit, + }, + }, + }, + { + // Add the exec OCR instances for the new chains. + Changeset: commonchangeset.WrapChangeSet(changeset.SetCandidateChangeset), + Config: changeset.SetCandidateChangesetConfig{ + changeset.SetCandidateConfigBase{ + HomeChainSelector: homeChainSel, + FeedChainSelector: feedChainSel, + OCRConfigPerRemoteChainSelector: ocrConfigPerSelector, + PluginType: types.PluginTypeCCIPExec, + }, + }, + }, + { + // Promote everything + Changeset: commonchangeset.WrapChangeSet(changeset.PromoteAllCandidatesChangeset), + Config: changeset.PromoteCandidatesChangesetConfig{ + HomeChainSelector: homeChainSel, + RemoteChainSelectors: chainSelectors, + PluginType: types.PluginTypeCCIPCommit, + }, + }, + { + // Promote everything + Changeset: commonchangeset.WrapChangeSet(changeset.PromoteAllCandidatesChangeset), + Config: changeset.PromoteCandidatesChangesetConfig{ + HomeChainSelector: homeChainSel, + RemoteChainSelectors: chainSelectors, + PluginType: types.PluginTypeCCIPExec, + }, + }, + { + // Enable the OCR config on the remote chains. + Changeset: commonchangeset.WrapChangeSet(changeset.SetOCR3OffRamp), + Config: changeset.SetOCR3OffRampConfig{ + HomeChainSel: homeChainSel, + RemoteChainSels: chainSelectors, + }, + }, + }) + + // todo: parallelize // Add all lanes - for from := range e.Chains { - for to := range e.Chains { - if from != to { - stateChain1 := state.Chains[from] + for src := range e.Chains { + for dst := range e.Chains { + if src != dst { + stateChain1 := state.Chains[src] newEnv, err := commonchangeset.ApplyChangesets(nil, *e, nil, []commonchangeset.ChangesetApplication{ { Changeset: commonchangeset.WrapChangeSet(changeset.UpdateOnRampsDests), Config: changeset.UpdateOnRampDestsConfig{ UpdatesByChain: map[uint64]map[uint64]changeset.OnRampDestinationUpdate{ - from: { - to: { + src: { + dst: { IsEnabled: true, - TestRouter: false, AllowListEnabled: false, }, }, @@ -176,13 +249,13 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de Changeset: commonchangeset.WrapChangeSet(changeset.UpdateFeeQuoterPricesCS), Config: changeset.UpdateFeeQuoterPricesConfig{ PricesByChain: map[uint64]changeset.FeeQuoterPriceUpdatePerSource{ - from: { + src: { TokenPrices: map[common.Address]*big.Int{ stateChain1.LinkToken.Address(): changeset.DefaultLinkPrice, stateChain1.Weth9.Address(): changeset.DefaultWethPrice, }, GasPrices: map[uint64]*big.Int{ - to: changeset.DefaultGasPrice, + dst: changeset.DefaultGasPrice, }, }, }, @@ -192,8 +265,8 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de Changeset: commonchangeset.WrapChangeSet(changeset.UpdateFeeQuoterDests), Config: changeset.UpdateFeeQuoterDestsConfig{ UpdatesByChain: map[uint64]map[uint64]fee_quoter.FeeQuoterDestChainConfig{ - from: { - to: changeset.DefaultFeeQuoterDestChainConfig(), + src: { + dst: changeset.DefaultFeeQuoterDestChainConfig(), }, }, }, @@ -202,10 +275,9 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de Changeset: commonchangeset.WrapChangeSet(changeset.UpdateOffRampSources), Config: changeset.UpdateOffRampSourcesConfig{ UpdatesByChain: map[uint64]map[uint64]changeset.OffRampSourceUpdate{ - to: { - from: { - IsEnabled: true, - TestRouter: true, + dst: { + src: { + IsEnabled: true, }, }, }, @@ -214,18 +286,21 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de { Changeset: commonchangeset.WrapChangeSet(changeset.UpdateRouterRamps), Config: changeset.UpdateRouterRampsConfig{ - TestRouter: true, UpdatesByChain: map[uint64]changeset.RouterUpdates{ - // onRamp update on source chain - from: { + src: { + OffRampUpdates: map[uint64]bool{ + dst: true, + }, OnRampUpdates: map[uint64]bool{ - to: true, + dst: true, }, }, - // off - from: { + dst: { OffRampUpdates: map[uint64]bool{ - to: true, + src: true, + }, + OnRampUpdates: map[uint64]bool{ + src: true, }, }, }, @@ -240,10 +315,16 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de } } + // distribute funds to transmitters + // we need to use the nodeinfo from the envConfig here, because multiAddr is not + // populated in the environment variable + distributeFunds(lggr, don.PluginNodes(), *e) + addresses, err := e.ExistingAddresses.Addresses() if err != nil { return DeployCCIPOutput{}, fmt.Errorf("failed to get convert address book to address book map: %w", err) } + return DeployCCIPOutput{ AddressBook: *deployment.NewMemoryAddressBookFromMap(addresses), NodeIDs: e.NodeIDs, diff --git a/deployment/environment/crib/env.go b/deployment/environment/crib/env.go index 3af1acaf754..22cb4ca0aad 100644 --- a/deployment/environment/crib/env.go +++ b/deployment/environment/crib/env.go @@ -16,15 +16,23 @@ func NewDevspaceEnvFromStateDir(envStateDir string) CRIBEnv { } } -func (c CRIBEnv) GetConfig() DeployOutput { +func (c CRIBEnv) GetConfig(key string) (DeployOutput, error) { reader := NewOutputReader(c.envStateDir) nodesDetails := reader.ReadNodesDetails() chainConfigs := reader.ReadChainConfigs() + for i, chain := range chainConfigs { + err := chain.SetDeployerKey(&key) + if err != nil { + return DeployOutput{}, err + } + chainConfigs[i] = chain + } + return DeployOutput{ AddressBook: reader.ReadAddressBook(), NodeIDs: nodesDetails.NodeIDs, Chains: chainConfigs, - } + }, nil } type RPC struct { diff --git a/deployment/environment/crib/helpers.go b/deployment/environment/crib/helpers.go new file mode 100644 index 00000000000..41d2fa7fa28 --- /dev/null +++ b/deployment/environment/crib/helpers.go @@ -0,0 +1,84 @@ +package crib + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + gethtypes "github.com/ethereum/go-ethereum/core/types" + chainsel "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/conversions" + "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/environment/devenv" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func distributeFunds(lggr logger.Logger, nodeInfo []devenv.Node, env deployment.Environment) { + transmittersStr := make([]common.Address, 0) + fundingAmount := big.NewInt(500_000_000_000_000_000) // 0.5 ETH + minThreshold := big.NewInt(50_000_000_000_000_000) // 0.05 ETH + + // todo: parallelize this using a waitgroup + for sel, chain := range env.Chains { + for _, n := range nodeInfo { + + chainId, err := chainsel.ChainIdFromSelector(sel) + if err != nil { + lggr.Errorw("could not get chain id from selector", "selector", sel, "err", err) + continue + } + addr := common.HexToAddress(n.AccountAddr[chainId]) + balance, err := chain.Client.BalanceAt(context.Background(), addr, nil) + if err != nil { + lggr.Errorw("error fetching balance for %s: %v\n", n.Name, err) + continue + } + if balance.Cmp(minThreshold) < 0 { + lggr.Infow( + "sending funds to", + "node", n.Name, + "address", addr.String(), + "amount", conversions.WeiToEther(fundingAmount).String(), + ) + transmittersStr = append(transmittersStr, addr) + } + } + latesthdr, err := chain.Client.HeaderByNumber(context.Background(), nil) + if err != nil { + lggr.Errorw("could not get header, skipping chain", "chain", sel, "err", err) + continue + } + block := latesthdr.Number.Uint64() + + nonce, err := chain.Client.NonceAt(context.Background(), chain.DeployerKey.From, big.NewInt(int64(block))) + if err != nil { + lggr.Warnw("could not get latest nonce for deployer key", "err", err) + continue + } + for _, transmitter := range transmittersStr { + tx := gethtypes.NewTransaction(nonce, transmitter, fundingAmount, uint64(1000000), big.NewInt(1000000), nil) + + signedTx, err := chain.DeployerKey.Signer(chain.DeployerKey.From, tx) + if err != nil { + lggr.Errorw("could not sign transaction to transmitter on ", "chain", sel, "transmitter", transmitter, "err", err) + continue + } + + lggr.Infow("sending transaction for ", "transmitter", transmitter.String(), "chain", sel) + err = chain.Client.SendTransaction(context.Background(), signedTx) + if err != nil { + lggr.Errorw("could not send transaction to transmitter on ", "chain", sel, "transmitter", transmitter, "err", err) + continue + } + + _, err = bind.WaitMined(context.Background(), chain.Client, signedTx) + if err != nil { + lggr.Errorw("could not mine transaction to transmitter on ", "chain", sel) + continue + } + nonce++ + } + } +} diff --git a/deployment/environment/devenv/chain.go b/deployment/environment/devenv/chain.go index 265a6647050..c53cb82a7ed 100644 --- a/deployment/environment/devenv/chain.go +++ b/deployment/environment/devenv/chain.go @@ -22,13 +22,18 @@ const ( EVMChainType = "EVM" ) +type CribRPCs struct { + Internal string + External string +} + // ChainConfig holds the configuration for a with a deployer key which can be used to send transactions to the chain. type ChainConfig struct { ChainID uint64 // chain id as per EIP-155, mainly applicable for EVM chains ChainName string // name of the chain populated from chainselector repo ChainType string // should denote the chain family. Acceptable values are EVM, COSMOS, SOLANA, STARKNET, APTOS etc - WSRPCs []string // websocket rpcs to connect to the chain - HTTPRPCs []string // http rpcs to connect to the chain + WSRPCs []CribRPCs // websocket rpcs to connect to the chain + HTTPRPCs []CribRPCs // http rpcs to connect to the chain DeployerKey *bind.TransactOpts // key to deploy and configure contracts on the chain Users []*bind.TransactOpts // map of addresses to their transact opts to interact with the chain as users } @@ -68,7 +73,6 @@ func (c *ChainConfig) SetDeployerKey(pvtKeyStr *string) error { if err != nil { return fmt.Errorf("failed to create transactor: %w", err) } - fmt.Printf("Deployer Address: %s for chain id %d\n", deployer.From.Hex(), c.ChainID) c.DeployerKey = deployer return nil } @@ -98,7 +102,7 @@ func NewChains(logger logger.Logger, configs []ChainConfig) (map[uint64]deployme // TODO : better client handling var ec *ethclient.Client for _, rpc := range chainCfg.WSRPCs { - ec, err = ethclient.Dial(rpc) + ec, err = ethclient.Dial(rpc.External) if err != nil { logger.Warnf("failed to dial ws rpc %s", rpc) continue diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go new file mode 100644 index 00000000000..9b8dc056fbe --- /dev/null +++ b/integration-tests/load/ccip/ccip_test.go @@ -0,0 +1,216 @@ +package ccip + +import ( + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-testing-framework/wasp" + ccipchangeset "github.com/smartcontractkit/chainlink/deployment/ccip/changeset" + crib "github.com/smartcontractkit/chainlink/deployment/environment/crib" + tc "github.com/smartcontractkit/chainlink/integration-tests/testconfig" + "github.com/stretchr/testify/require" + "math/big" + "sync" + "testing" + "time" +) + +var ( + CommonTestLabels = map[string]string{ + "branch": "ccip_load_crib", + "commit": "ccip_load_crib", + } + wg sync.WaitGroup + SIM_CHAIN_TEST_KEY = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" +) + +// step 1: setup +// Parse the test config, initialize CRIB with configurations defined +// step 2: load +// Use wasp to initiate load +// step 3: parse logs +// Parse all events from the simulated chains, send to Loki +// step 4: teardown +// Stop the chains, cleanup the environment +func TestCCIPLoad_RPS(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.Test(t) + + config, err := tc.GetConfig([]string{"Load"}, tc.CCIP) + require.NoError(t, err) + lggr.Infof("loaded ccip test config: %+v", config.CCIP.Load) + userOverrides := config.CCIP.Load + + timeout, err := time.ParseDuration(*userOverrides.LoadDuration) + if err != nil { + require.NoError(t, err) + } + t.Cleanup(func() { + time.AfterFunc(timeout, func() { + t.Fatalf("Test passed timeout after %v", timeout) + }) + }) + + cribEnv := crib.NewDevspaceEnvFromStateDir(*userOverrides.CribEnvDirectory) + + cribDeployOutput, err := cribEnv.GetConfig(SIM_CHAIN_TEST_KEY) + require.NoError(t, err) + env, err := crib.NewDeployEnvironmentFromCribOutput(lggr, cribDeployOutput) + require.NoError(t, err) + require.NotNil(t, env) + + // Need to keep track of the block number for each chain so that event subscription can be done from that block. + startBlocks := make(map[uint64]*uint64) + state, err := ccipchangeset.LoadOnchainState(*env) + require.NoError(t, err) + + // Parse all events from the simulated chains, send to Loki + loki, err := wasp.NewLokiClient(wasp.NewLokiConfig(userOverrides.LokiEndpoint, nil, nil, nil)) + require.NoError(t, err) + defer loki.StopNow() + + gunMap := make(map[uint64]*DestinationGun) + // Based on the config, initialize DestinationGun + p := wasp.NewProfile() + + // Only create a destination gun if we have decided to send traffic to this chain + for _, cs := range *userOverrides.EnabledDestionationChains { + latesthdr, err := env.Chains[cs].Client.HeaderByNumber(ctx, nil) + require.NoError(t, err) + block := latesthdr.Number.Uint64() + startBlocks[cs] = &block + + gunMap[cs], err = NewDestinationGun(env.Logger, cs, *env, state.Chains[cs].Receiver.Address(), userOverrides, loki) + if err != nil { + lggr.Errorw("Failed to initialize DestinationGun for", "chainSelector", cs, "error", err) + t.Fatal(err) + } + } + + loadDuration, err := time.ParseDuration(*userOverrides.LoadDuration) + require.NoError(t, err) + requestFrequency, err := time.ParseDuration(*userOverrides.RequestFrequency) + require.NoError(t, err) + + for _, gun := range gunMap { + p.Add(wasp.NewGenerator(&wasp.Config{ + T: t, + GenName: "ccipLoad", + LoadType: wasp.RPS, + CallTimeout: 5 * time.Second, + // 1 request per second for n seconds + Schedule: wasp.Plain(1, loadDuration), + // limit requests to 1 per duration + RateLimitUnitDuration: requestFrequency, + // will need to be divided by number of chains + // this schedule is per generator + // in this example, it would be 1 request per 5seconds per generator (dest chain) + // so if there are 3 generators, it would be 3 requests per 5 seconds over the network + Gun: gun, + Labels: CommonTestLabels, + LokiConfig: wasp.NewLokiConfig(config.CCIP.Load.LokiEndpoint, nil, nil, nil), + // use the same loki client using `NewLokiClient` with the same config for sending events + })) + } + + _, err = p.Run(true) + + // wait for offchain to complete handling load fully + execExpectedSeqNums := make(map[ccipchangeset.SourceDestPair][]uint64) + commitExepectedSeqNums := make(map[ccipchangeset.SourceDestPair]uint64) + for _, gun := range gunMap { + for csPair := range gun.seqNums { + commitExepectedSeqNums[csPair] = gun.seqNums[csPair].End.Load() + for i := gun.seqNums[csPair].Start.Load(); i <= gun.seqNums[csPair].End.Load(); i++ { + execExpectedSeqNums[csPair] = append(execExpectedSeqNums[csPair], i) + } + } + } + ccipchangeset.ConfirmCommitForAllWithExpectedSeqNums(t, *env, state, commitExepectedSeqNums, startBlocks) + ccipchangeset.ConfirmExecWithSeqNrsForAll(t, *env, state, execExpectedSeqNums, startBlocks) + + // todo: create channels that subscribe to these events beforehand using WatchExecutionStateChanged and WatchCommitReportAccepted + lokiLabels := map[string]string{} + for chainSelector, startBlock := range startBlocks { + filterOpts := &bind.FilterOpts{ + Start: *startBlock, + End: nil, // To the latest block + Context: ctx, + } + + wg.Add(1) + go func(chainSelector uint64, startBlock *uint64, filterOpts *bind.FilterOpts) { + defer wg.Done() + lggr.Infow("Starting to query for events on ", "chainSelector", chainSelector, "startblock", startBlock) + latesthdr, err := env.Chains[chainSelector].Client.HeaderByNumber(ctx, nil) + require.NoError(t, err) + lggr.Infow("Current block number", "chainSelector", chainSelector, "block", latesthdr.Number.Uint64()) + + offRamp := state.Chains[chainSelector].OffRamp + // Filter CommitReportAccepted events + commitIterator, err := offRamp.FilterCommitReportAccepted(filterOpts) + require.NoError(t, err) + + for commitIterator.Next() { + event := commitIterator.Event + + blockNum := commitIterator.Event.Raw.BlockNumber + header, err := env.Chains[chainSelector].Client.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) + require.NoError(t, err) + timestamp := time.Unix(int64(header.Time), 0) + + for _, root := range event.MerkleRoots { + lokiLabels, err = setLokiLabels(root.SourceChainSelector, chainSelector) + require.NoError(t, err) + + for i := root.MinSeqNr; i <= root.MaxSeqNr; i++ { + // todo: push loki calls to channel? + + SendMetricsToLoki(lggr, loki, lokiLabels, &LokiMetric{ + EventType: committed, + Timestamp: timestamp, + SequenceNumber: i, + }) + lggr.Infow("pushed loki commit event for ", "seqNumber", i, "src", root.SourceChainSelector, "dest", chainSelector) + + } + } + } + }(chainSelector, startBlock, filterOpts) + + for sourceCS := range env.Chains { + wg.Add(1) + go func(srcSelector uint64, startBlock *uint64, filterOpts *bind.FilterOpts) { + defer wg.Done() + csPair := ccipchangeset.SourceDestPair{ + SourceChainSelector: srcSelector, + DestChainSelector: chainSelector, + } + // Filter ExecutionStateChanged events + execIterator, err := state.Chains[chainSelector].OffRamp.FilterExecutionStateChanged(filterOpts, []uint64{srcSelector}, execExpectedSeqNums[csPair], nil) + require.NoError(t, err) + + for execIterator.Next() { + blockNum := execIterator.Event.Raw.BlockNumber + header, err := env.Chains[chainSelector].Client.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) + require.NoError(t, err) + timestamp := time.Unix(int64(header.Time), 0) + + // todo: push loki calls to channel? + lokiLabels, err = setLokiLabels(execIterator.Event.SourceChainSelector, chainSelector) + require.NoError(t, err) + + SendMetricsToLoki(lggr, loki, lokiLabels, &LokiMetric{ + EventType: executed, + Timestamp: timestamp, + GasUsed: execIterator.Event.GasUsed.Uint64(), + SequenceNumber: execIterator.Event.SequenceNumber, + }) + lggr.Infow("pushed loki exec event for ", "seqNumber", execIterator.Event.SequenceNumber, "src", execIterator.Event.SourceChainSelector, "dest", chainSelector) + } + }(sourceCS, startBlock, filterOpts) + } + } + + wg.Wait() +} diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go new file mode 100644 index 00000000000..cb9959e3b1a --- /dev/null +++ b/integration-tests/load/ccip/destination_gun.go @@ -0,0 +1,259 @@ +package ccip + +import ( + "context" + "fmt" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-testing-framework/wasp" + "github.com/smartcontractkit/chainlink/deployment" + ccipchangeset "github.com/smartcontractkit/chainlink/deployment/ccip/changeset" + "github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/router" + "go.uber.org/atomic" + "math/big" + "math/rand" + "time" +) + +type SeqNumRange struct { + Start *atomic.Uint64 + End *atomic.Uint64 +} + +type DestinationGun struct { + l logger.Logger + env deployment.Environment + seqNums map[ccipchangeset.SourceDestPair]SeqNumRange + roundNum *atomic.Int32 + chainSelector uint64 + receiver common.Address + testConfig *ccip.LoadConfig + loki *wasp.LokiClient +} + +func NewDestinationGun(l logger.Logger, chainSelector uint64, env deployment.Environment, receiver common.Address, overrides *ccip.LoadConfig, loki *wasp.LokiClient) (*DestinationGun, error) { + seqNums := make(map[ccipchangeset.SourceDestPair]SeqNumRange) + for _, cs := range env.AllChainSelectorsExcluding([]uint64{chainSelector}) { + + // query for the actual sequence number + seqNums[ccipchangeset.SourceDestPair{ + SourceChainSelector: cs, + DestChainSelector: chainSelector, + }] = SeqNumRange{ + Start: atomic.NewUint64(0), + End: atomic.NewUint64(0), + } + } + dg := DestinationGun{ + l: l, + env: env, + seqNums: seqNums, + roundNum: &atomic.Int32{}, + chainSelector: chainSelector, + receiver: receiver, + testConfig: overrides, + loki: loki, + } + + err := dg.Validate() + if err != nil { + return nil, err + } + + return &dg, nil +} + +func (m *DestinationGun) Validate() error { + if len(*m.testConfig.MessageTypeWeights) != 3 { + return fmt.Errorf( + "message type must have 3 weights corresponding to message only, token only, token with message") + } + sum := 0 + for _, weight := range *m.testConfig.MessageTypeWeights { + sum += weight + } + if sum != 100 { + return fmt.Errorf("message type weights must sum to 100") + } + return nil +} + +func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { + m.roundNum.Add(1) + requestedRound := m.roundNum.Load() + + waspGroup := fmt.Sprintf("%d-%s", m.chainSelector, "messageOnly") + + state, err := ccipchangeset.LoadOnchainState(m.env) + if err != nil { + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + + src, err := m.MustSourceChain() + if err != nil { + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + + lokiLabels, err := setLokiLabels(src, m.chainSelector) + if err != nil { + m.l.Errorw("Failed setting loki labels", "error", err) + } + + csPair := ccipchangeset.SourceDestPair{ + SourceChainSelector: src, + DestChainSelector: m.chainSelector, + } + + r := state.Chains[src].Router + + msg, err := m.GetMessage() + if err != nil { + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + + fee, err := r.GetFee( + &bind.CallOpts{Context: context.Background()}, m.chainSelector, msg) + if err != nil { + m.l.Errorw("could not get fee ", + "dstChainSelector", m.chainSelector, + "msg", msg, + "fee", fee, + "err", deployment.MaybeDataErr(err)) + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + if msg.FeeToken == common.HexToAddress("0x0") { + m.env.Chains[src].DeployerKey.Value = fee + defer func() { m.env.Chains[src].DeployerKey.Value = nil }() + } + m.l.Debugw("sending message ", + "srcChain", src, + "dstChain", m.chainSelector, + "round", requestedRound, + "fee", fee, + "msg", msg) + tx, err := r.CcipSend( + m.env.Chains[src].DeployerKey, + m.chainSelector, + msg) + if err != nil { + m.l.Errorw("execution reverted from ", + "sourceChain", src, + "destchain", m.chainSelector, + "err", deployment.MaybeDataErr(err)) + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + + blockNum, err := m.env.Chains[src].Confirm(tx) + if err != nil { + m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err)) + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + + // todo: wasp should not manage confirming the message + // instead, we should manage the sequence number atomically (at a higher level) + it, err := state.Chains[src].OnRamp.FilterCCIPMessageSent(&bind.FilterOpts{ + Start: blockNum, + End: &blockNum, + Context: context.Background(), + }, []uint64{m.chainSelector}, []uint64{}) + if err != nil { + m.l.Errorw("could not find sent message event on src chain", "src", src, "dst", m.chainSelector, "err", err) + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} + } + if !it.Next() { + m.l.Errorw("Could not find event") + return &wasp.Response{Error: "Could not iterate", Group: waspGroup, Failed: true} + } + + m.l.Infow("Transmitted message with", + "sourceChain", src, + "destChain", m.chainSelector, + "sequence number", it.Event.SequenceNumber) + + SendMetricsToLoki(m.l, m.loki, lokiLabels, &LokiMetric{ + EventType: transmitted, + Timestamp: time.Now(), + SequenceNumber: it.Event.SequenceNumber, + }) + + // if this is the first time we are sending a message, set the start sequence number + // if we ran into a concurrency issue, store the lowest sequence number + if it.Event.SequenceNumber < m.seqNums[csPair].Start.Load() || m.seqNums[csPair].End.Load() == 0 { + m.seqNums[csPair].Start.Store(it.Event.SequenceNumber) + } + + // only store the greatest sequence number we have seen as the maximum + if it.Event.SequenceNumber > m.seqNums[csPair].End.Load() { + m.seqNums[csPair].End.Store(it.Event.SequenceNumber) + } + + return &wasp.Response{Failed: false, Group: waspGroup} +} + +// MustSourceChain will return a chain selector to send a message from +func (m *DestinationGun) MustSourceChain() (uint64, error) { + // TODO: make this smarter by checking if this chain has sent a message recently, if so, switch to the next chain + // Currently performing a round robin + otherCS := m.env.AllChainSelectorsExcluding([]uint64{m.chainSelector}) + if len(otherCS) == 0 { + return 0, fmt.Errorf("no other chains to send from") + } + index := m.roundNum.Load() % int32(len(otherCS)) + return otherCS[index], nil +} + +// GetMessage will return the message to be sent while considering expected load of different messages +func (m *DestinationGun) GetMessage() (router.ClientEVM2AnyMessage, error) { + rcv, err := utils.ABIEncode(`[{"type":"address"}]`, m.receiver) + if err != nil { + m.l.Error("Error encoding receiver address") + return router.ClientEVM2AnyMessage{}, err + } + + messages := []router.ClientEVM2AnyMessage{ + { + Receiver: rcv, + Data: common.Hex2Bytes("0xabcdefabcdef"), + TokenAmounts: nil, + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, + { + Receiver: rcv, + TokenAmounts: []router.ClientEVMTokenAmount{ + { + Token: common.HexToAddress("0x0"), + Amount: big.NewInt(100), + }, + }, + Data: common.Hex2Bytes("0xabcdefabcdef"), + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, + { + Receiver: rcv, + Data: common.Hex2Bytes("message with token"), + TokenAmounts: []router.ClientEVMTokenAmount{ + { + Token: common.HexToAddress("0x0"), + Amount: big.NewInt(100), + }, + }, + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, + } + // Select a random message + randomValue := rand.Intn(100) + switch { + case randomValue < (*m.testConfig.MessageTypeWeights)[0]: + return messages[0], nil + case randomValue < (*m.testConfig.MessageTypeWeights)[0]+(*m.testConfig.MessageTypeWeights)[1]: + return messages[1], nil + default: + return messages[2], nil + } +} diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go new file mode 100644 index 00000000000..d7e1beec552 --- /dev/null +++ b/integration-tests/load/ccip/helpers.go @@ -0,0 +1,49 @@ +package ccip + +import ( + "fmt" + chainselectors "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-testing-framework/wasp" + "time" +) + +const ( + transmitted = iota + committed + executed + LokiLoadLabel = "ccip_load_test" + ErrLokiClient = "failed to create Loki client for monitoring" + ErrLokiPush = "failed to push metrics to Loki" +) + +// todo: Have a different struct for commit/exec? +type LokiMetric struct { + EventType int `json:"event_type"` + Timestamp time.Time `json:"timestamp"` + GasUsed uint64 `json:"gas_used"` + SequenceNumber uint64 `json:"sequence_number"` +} + +func SendMetricsToLoki(l logger.Logger, lc *wasp.LokiClient, updatedLabels map[string]string, metrics *LokiMetric) { + if err := lc.HandleStruct(wasp.LabelsMapToModel(updatedLabels), time.Now(), metrics); err != nil { + l.Error(ErrLokiPush) + } +} + +func setLokiLabels(src, dst uint64) (map[string]string, error) { + srcChainId, err := chainselectors.GetChainIDFromSelector(src) + if err != nil { + return nil, err + } + dstChainId, err := chainselectors.GetChainIDFromSelector(dst) + if err != nil { + return nil, err + } + return map[string]string{ + "sourceEvmChainId": fmt.Sprintf("%s", srcChainId), + "destEvmChainId": fmt.Sprintf("%s", dstChainId), + "destinationSelector": fmt.Sprintf("%d", dst), + "testType": LokiLoadLabel, + }, nil +} diff --git a/integration-tests/testconfig/ccip/ccip.toml b/integration-tests/testconfig/ccip/ccip.toml index 3f4ba43c48c..174efa452a0 100644 --- a/integration-tests/testconfig/ccip/ccip.toml +++ b/integration-tests/testconfig/ccip/ccip.toml @@ -240,3 +240,16 @@ addresses_to_fund = [ [Seth] # Seth specific configuration, no need for generating ephemeral addresses for ccip-tests. ephemeral_addresses_number = 0 + +[Load.CCIP.Load] +# replace this with the loki endpoint of the crib stack +LokiEndpoint = "http://loki.localhost/loki/api/v1/push" +# MessageTypeWeights corresponds with [data only, token only, message with data] +MessageTypeWeights = [100,0,0] +# each destination chain will receive 1 incoming request per RequestFrequency for the duration of LoadDuration +LoadDuration = "1m" +RequestFrequency = "10s" +# destination chain selectors to send messages to +EnabledDestionationChains = [3379446385462418246, 909606746561742123] +# Directory where we receive environment configuration from crib +CribEnvDirectory = "/Users/austin.wang/ccip-core/repos/crib/deployments/ccip-v2/.tmp" \ No newline at end of file diff --git a/integration-tests/testconfig/ccip/config.go b/integration-tests/testconfig/ccip/config.go index 70c850fd591..374322041ec 100644 --- a/integration-tests/testconfig/ccip/config.go +++ b/integration-tests/testconfig/ccip/config.go @@ -38,6 +38,7 @@ type Config struct { HomeChainSelector *string `toml:",omitempty"` FeedChainSelector *string `toml:",omitempty"` RMNConfig RMNConfig `toml:",omitempty"` + Load *LoadConfig `toml:",omitempty"` } type RMNConfig struct { diff --git a/integration-tests/testconfig/ccip/load.go b/integration-tests/testconfig/ccip/load.go new file mode 100644 index 00000000000..84610132f67 --- /dev/null +++ b/integration-tests/testconfig/ccip/load.go @@ -0,0 +1,11 @@ +package ccip + +type LoadConfig struct { + LoadDuration *string + NoOfNodes *int + LokiEndpoint *string + MessageTypeWeights *[]int + RequestFrequency *string + EnabledDestionationChains *[]uint64 + CribEnvDirectory *string +}