Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast multiple tx per block from the same wallet #1084

Merged
merged 19 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/chains/cosmos"
"github.com/cosmos/relayer/v2/relayer/provider"
"github.com/spf13/cobra"
registry "github.com/strangelove-ventures/lens/client/chain_registry"
"go.uber.org/zap"
Expand Down Expand Up @@ -476,6 +477,7 @@ func addChainsFromRegistry(ctx context.Context, a *appState, chains []string) er
OutputFormat: chainConfig.OutputFormat,
SignModeStr: chainConfig.SignModeStr,
ExtraCodecs: chainConfig.ExtraCodecs,
Broadcast: provider.BroadcastModeBatch,
Slip44: chainConfig.Slip44,
}

Expand Down
2 changes: 2 additions & 0 deletions interchaintest/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cosmos/relayer/v2/internal/relayertest"
"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/chains/cosmos"
"github.com/cosmos/relayer/v2/relayer/provider"
interchaintestcosmos "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos"
"github.com/strangelove-ventures/interchaintest/v7/ibc"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (r *Relayer) AddChainConfiguration(ctx context.Context, _ ibc.RelayerExecRe
Timeout: "10s",
OutputFormat: "json",
SignModeStr: "direct",
Broadcast: provider.BroadcastModeBatch,
},
})

Expand Down
5 changes: 4 additions & 1 deletion relayer/chains/cosmos/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func (cc *CosmosProvider) LogFailedTx(res *provider.RelayerTxResponse, err error
}
}

if res.Code != 0 && res.Data != "" {
if res.Code != 0 {
if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil {
fields = append(fields, zap.NamedError("sdk_error", sdkErr))
}
fields = append(fields, zap.Object("response", res))
cc.log.Warn(
"Sent transaction but received failure response",
Expand Down
45 changes: 29 additions & 16 deletions relayer/chains/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ var (
const tendermintEncodingThreshold = "v0.37.0-alpha"

type CosmosProviderConfig struct {
Key string `json:"key" yaml:"key"`
ChainName string `json:"-" yaml:"-"`
ChainID string `json:"chain-id" yaml:"chain-id"`
RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"`
AccountPrefix string `json:"account-prefix" yaml:"account-prefix"`
KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"`
GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"`
GasPrices string `json:"gas-prices" yaml:"gas-prices"`
MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"`
Debug bool `json:"debug" yaml:"debug"`
Timeout string `json:"timeout" yaml:"timeout"`
OutputFormat string `json:"output-format" yaml:"output-format"`
SignModeStr string `json:"sign-mode" yaml:"sign-mode"`
ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"`
Slip44 int `json:"coin-type" yaml:"coin-type"`
Key string `json:"key" yaml:"key"`
ChainName string `json:"-" yaml:"-"`
ChainID string `json:"chain-id" yaml:"chain-id"`
RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"`
AccountPrefix string `json:"account-prefix" yaml:"account-prefix"`
KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"`
GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"`
GasPrices string `json:"gas-prices" yaml:"gas-prices"`
MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"`
Debug bool `json:"debug" yaml:"debug"`
Timeout string `json:"timeout" yaml:"timeout"`
OutputFormat string `json:"output-format" yaml:"output-format"`
SignModeStr string `json:"sign-mode" yaml:"sign-mode"`
ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"`
Slip44 int `json:"coin-type" yaml:"coin-type"`
Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"`
}

func (pc CosmosProviderConfig) Validate() error {
Expand All @@ -55,6 +56,10 @@ func (pc CosmosProviderConfig) Validate() error {
return nil
}

func (pc CosmosProviderConfig) BroadcastMode() provider.BroadcastMode {
return pc.Broadcast
}

// NewProvider validates the CosmosProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider
func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) {
if err := pc.Validate(); err != nil {
Expand All @@ -72,6 +77,10 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb
}
pc.ChainName = chainName

if pc.Broadcast == "" {
pc.Broadcast = provider.BroadcastModeBatch
}

return &CosmosProvider{
log: log,
ChainClient: *cc,
Expand Down Expand Up @@ -136,10 +145,14 @@ func (h CosmosIBCHeader) ConsensusState() ibcexported.ConsensusState {
return &tmclient.ConsensusState{
Timestamp: h.SignedHeader.Time,
Root: commitmenttypes.NewMerkleRoot(h.SignedHeader.AppHash),
NextValidatorsHash: h.ValidatorSet.Hash(),
NextValidatorsHash: h.SignedHeader.NextValidatorsHash,
}
}

func (h CosmosIBCHeader) NextValidatorsHash() []byte {
return h.SignedHeader.NextValidatorsHash
}

func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig {
return cc.PCfg
}
Expand Down
220 changes: 209 additions & 11 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@ import (
tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride"
"github.com/cosmos/relayer/v2/relayer/provider"
lensclient "github.com/strangelove-ventures/lens/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/light"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/zap"
)

// Variables used for retries
var (
rtyAttNum = uint(5)
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
rtyAttNum = uint(5)
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
defaultBroadcastWaitTimeout = 10 * time.Minute
errUnknown = "unknown"
)

// Default IBC settings
Expand Down Expand Up @@ -188,11 +192,12 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela
}

rlyResp := &provider.RelayerTxResponse{
Height: resp.Height,
TxHash: resp.TxHash,
Code: resp.Code,
Data: resp.Data,
Events: parseEventsFromTxResponse(resp),
Height: resp.Height,
TxHash: resp.TxHash,
Codespace: resp.Codespace,
Code: resp.Code,
Data: resp.Data,
Events: parseEventsFromTxResponse(resp),
}

// transaction was executed, log the success or failure using the tx response code
Expand All @@ -210,6 +215,192 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela
return rlyResp, true, nil
}

// SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo.
// This method will return once the transaction has entered the mempool.
// In an async goroutine, will wait for the tx to be included in the block unless asyncCtx exits.
// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion.
func (cc *CosmosProvider) SendMessagesToMempool(
ctx context.Context,
msgs []provider.RelayerMessage,
memo string,

asyncCtx context.Context,
asyncCallback func(*provider.RelayerTxResponse, error),
) error {
// Guard against account sequence number mismatch errors by locking for the specific wallet for
// the account sequence query all the way through the transaction broadcast success/fail.
cc.txMu.Lock()
defer cc.txMu.Unlock()

txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo)
if err != nil {
// Account sequence mismatch errors can happen on the simulated transaction also.
if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) {
cc.handleAccountSequenceMismatchError(err)
}

return err
}

if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback); err != nil {
if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) {
cc.handleAccountSequenceMismatchError(err)
}

return err
}

// we had a successful tx broadcast with this sequence, so update it to the next
cc.updateNextAccountSequence(sequence + 1)

return nil
}

// sdkError will return the Cosmos SDK registered error for a given codespace/code combo if registered, otherwise nil.
func (cc *CosmosProvider) sdkError(codespace string, code uint32) error {
// ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace
// This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go
agouin marked this conversation as resolved.
Show resolved Hide resolved
err := errors.Unwrap(sdkerrors.ABCIError(codespace, code, "error broadcasting transaction"))
if err.Error() != errUnknown {
return err
}
return nil
}

// broadcastTx broadcasts a transaction with the given raw bytes and then, in an async goroutine, waits for the tx to be included in the block.
// The wait will end after either the asyncTimeout has run out or the asyncCtx exits.
// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion.
func (cc *CosmosProvider) broadcastTx(
ctx context.Context, // context for tx broadcast
tx []byte, // raw tx to be broadcasted
msgs []provider.RelayerMessage, // used for logging only
fees sdk.Coins, // used for metrics

asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast
asyncTimeout time.Duration, // timeout for waiting for block inclusion
asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion
) error {
res, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx)
if err != nil {
if res == nil {
// There are some cases where BroadcastTxSync will return an error but the associated
// ResultBroadcastTx will be nil.
return err
}
rlyResp := &provider.RelayerTxResponse{
TxHash: res.Hash.String(),
Codespace: res.Codespace,
Code: res.Code,
Data: res.Data.String(),
}
cc.LogFailedTx(rlyResp, err, msgs)
return err
}

cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees)

// TODO: maybe we need to check if the node has tx indexing enabled?
// if not, we need to find a new way to block until inclusion in a block

go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallback)

return nil
}

// waitForTx waits for a transaction to be included in a block, logs success/fail, then invokes callback.
// This is intended to be called as an async goroutine.
func (cc *CosmosProvider) waitForTx(
ctx context.Context,
txHash []byte,
msgs []provider.RelayerMessage, // used for logging only
waitTimeout time.Duration,
callback func(*provider.RelayerTxResponse, error),
) {
res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout)
if err != nil {
cc.log.Error("Failed to wait for block inclusion", zap.Error(err))
if callback != nil {
callback(nil, err)
}
return
}

rlyResp := &provider.RelayerTxResponse{
Height: res.Height,
TxHash: res.TxHash,
Codespace: res.Codespace,
Code: res.Code,
Data: res.Data,
Events: parseEventsFromTxResponse(res),
}

// transaction was executed, log the success or failure using the tx response code
// NOTE: error is nil, logic should use the returned error to determine if the
// transaction was successfully executed.

if res.Code != 0 {
// Check for any registered SDK errors
err := cc.sdkError(res.Codespace, res.Code)
if err == nil {
err = fmt.Errorf("transaction failed to execute")
}
if callback != nil {
callback(nil, err)
}
cc.LogFailedTx(rlyResp, nil, msgs)
return
}

if callback != nil {
callback(rlyResp, nil)
}
cc.LogSuccessTx(res, msgs)
}

// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation.
func (cc *CosmosProvider) waitForBlockInclusion(
ctx context.Context,
txHash []byte,
waitTimeout time.Duration,
) (*sdk.TxResponse, error) {
exitAfter := time.After(waitTimeout)
for {
select {
case <-exitAfter:
return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, lensclient.ErrTimeoutAfterWaitingForTxBroadcast)
// This fixed poll is fine because it's only for logging and updating prometheus metrics currently.
case <-time.After(time.Millisecond * 100):
res, err := cc.ChainClient.RPCClient.Tx(ctx, txHash, false)
if err == nil {
return cc.mkTxResult(res)
}
if strings.Contains(err.Error(), "transaction indexing is disabled") {
return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url")
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

// mkTxResult decodes a tendermint transaction into an SDK TxResponse.
func (cc *CosmosProvider) mkTxResult(resTx *coretypes.ResultTx) (*sdk.TxResponse, error) {
txbz, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx)
if err != nil {
return nil, err
}
p, ok := txbz.(intoAny)
if !ok {
return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz)
}
any := p.AsAny()
return sdk.NewResponseResultTx(resTx, any, ""), nil
}

type intoAny interface {
AsAny() *codectypes.Any
}

func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent {
var events []provider.RelayerEvent

Expand Down Expand Up @@ -243,6 +434,13 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
txf = txf.WithMemo(memo)
}

sequence := txf.Sequence()
cc.updateNextAccountSequence(sequence)
if sequence < cc.nextAccountSeq {
sequence = cc.nextAccountSeq
txf = txf.WithSequence(sequence)
}

// TODO: Make this work with new CalculateGas method
// TODO: This is related to GRPC client stuff?
// https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297
Expand Down Expand Up @@ -296,7 +494,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
return nil, 0, sdk.Coins{}, err
}

return txBytes, txf.Sequence(), fees, nil
return txBytes, sequence, fees, nil
}

// handleAccountSequenceMismatchError will parse the error string, e.g.:
Expand Down
Loading