diff --git a/cmd/chains.go b/cmd/chains.go index dee2ad811..d6f52631c 100644 --- a/cmd/chains.go +++ b/cmd/chains.go @@ -13,6 +13,7 @@ import ( "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" "github.com/spf13/cobra" registry "github.com/strangelove-ventures/lens/client/chain_registry" "go.uber.org/zap" @@ -476,6 +477,7 @@ func addChainsFromRegistry(ctx context.Context, a *appState, chains []string) er OutputFormat: chainConfig.OutputFormat, SignModeStr: chainConfig.SignModeStr, ExtraCodecs: chainConfig.ExtraCodecs, + Broadcast: provider.BroadcastModeBatch, Slip44: chainConfig.Slip44, } diff --git a/interchaintest/relayer.go b/interchaintest/relayer.go index 6c9be1189..cb93ca948 100644 --- a/interchaintest/relayer.go +++ b/interchaintest/relayer.go @@ -14,6 +14,7 @@ import ( "github.com/cosmos/relayer/v2/internal/relayertest" "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" interchaintestcosmos "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos" "github.com/strangelove-ventures/interchaintest/v7/ibc" "github.com/stretchr/testify/require" @@ -77,6 +78,7 @@ func (r *Relayer) AddChainConfiguration(ctx context.Context, _ ibc.RelayerExecRe Timeout: "10s", OutputFormat: "json", SignModeStr: "direct", + Broadcast: provider.BroadcastModeBatch, }, }) diff --git a/relayer/chains/cosmos/log.go b/relayer/chains/cosmos/log.go index 2a85a1d94..75e43a982 100644 --- a/relayer/chains/cosmos/log.go +++ b/relayer/chains/cosmos/log.go @@ -64,7 +64,10 @@ func (cc *CosmosProvider) LogFailedTx(res *provider.RelayerTxResponse, err error } } - if res.Code != 0 && res.Data != "" { + if res.Code != 0 { + if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil { + fields = append(fields, zap.NamedError("sdk_error", sdkErr)) + } fields = append(fields, zap.Object("response", res)) cc.log.Warn( "Sent transaction but received failure response", diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index 188df9003..a1ded512d 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -31,21 +31,22 @@ var ( const tendermintEncodingThreshold = "v0.37.0-alpha" type CosmosProviderConfig struct { - Key string `json:"key" yaml:"key"` - ChainName string `json:"-" yaml:"-"` - ChainID string `json:"chain-id" yaml:"chain-id"` - RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` - AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` - KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` - GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` - GasPrices string `json:"gas-prices" yaml:"gas-prices"` - MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` - Debug bool `json:"debug" yaml:"debug"` - Timeout string `json:"timeout" yaml:"timeout"` - OutputFormat string `json:"output-format" yaml:"output-format"` - SignModeStr string `json:"sign-mode" yaml:"sign-mode"` - ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` - Slip44 int `json:"coin-type" yaml:"coin-type"` + Key string `json:"key" yaml:"key"` + ChainName string `json:"-" yaml:"-"` + ChainID string `json:"chain-id" yaml:"chain-id"` + RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` + AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` + KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` + GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` + GasPrices string `json:"gas-prices" yaml:"gas-prices"` + MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` + Debug bool `json:"debug" yaml:"debug"` + Timeout string `json:"timeout" yaml:"timeout"` + OutputFormat string `json:"output-format" yaml:"output-format"` + SignModeStr string `json:"sign-mode" yaml:"sign-mode"` + ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` + Slip44 int `json:"coin-type" yaml:"coin-type"` + Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` } func (pc CosmosProviderConfig) Validate() error { @@ -55,6 +56,10 @@ func (pc CosmosProviderConfig) Validate() error { return nil } +func (pc CosmosProviderConfig) BroadcastMode() provider.BroadcastMode { + return pc.Broadcast +} + // NewProvider validates the CosmosProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) { if err := pc.Validate(); err != nil { @@ -72,6 +77,10 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb } pc.ChainName = chainName + if pc.Broadcast == "" { + pc.Broadcast = provider.BroadcastModeBatch + } + return &CosmosProvider{ log: log, ChainClient: *cc, @@ -136,10 +145,14 @@ func (h CosmosIBCHeader) ConsensusState() ibcexported.ConsensusState { return &tmclient.ConsensusState{ Timestamp: h.SignedHeader.Time, Root: commitmenttypes.NewMerkleRoot(h.SignedHeader.AppHash), - NextValidatorsHash: h.ValidatorSet.Hash(), + NextValidatorsHash: h.SignedHeader.NextValidatorsHash, } } +func (h CosmosIBCHeader) NextValidatorsHash() []byte { + return h.SignedHeader.NextValidatorsHash +} + func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig { return cc.PCfg } diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index aa5fa303f..a33c1b9eb 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -26,19 +26,23 @@ import ( tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride" "github.com/cosmos/relayer/v2/relayer/provider" + lensclient "github.com/strangelove-ventures/lens/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/light" + coretypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/zap" ) // Variables used for retries var ( - rtyAttNum = uint(5) - rtyAtt = retry.Attempts(rtyAttNum) - rtyDel = retry.Delay(time.Millisecond * 400) - rtyErr = retry.LastErrorOnly(true) - numRegex = regexp.MustCompile("[0-9]+") + rtyAttNum = uint(5) + rtyAtt = retry.Attempts(rtyAttNum) + rtyDel = retry.Delay(time.Millisecond * 400) + rtyErr = retry.LastErrorOnly(true) + numRegex = regexp.MustCompile("[0-9]+") + defaultBroadcastWaitTimeout = 10 * time.Minute + errUnknown = "unknown" ) // Default IBC settings @@ -188,11 +192,12 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela } rlyResp := &provider.RelayerTxResponse{ - Height: resp.Height, - TxHash: resp.TxHash, - Code: resp.Code, - Data: resp.Data, - Events: parseEventsFromTxResponse(resp), + Height: resp.Height, + TxHash: resp.TxHash, + Codespace: resp.Codespace, + Code: resp.Code, + Data: resp.Data, + Events: parseEventsFromTxResponse(resp), } // transaction was executed, log the success or failure using the tx response code @@ -210,6 +215,192 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela return rlyResp, true, nil } +// SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo. +// This method will return once the transaction has entered the mempool. +// In an async goroutine, will wait for the tx to be included in the block unless asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) SendMessagesToMempool( + ctx context.Context, + msgs []provider.RelayerMessage, + memo string, + + asyncCtx context.Context, + asyncCallback func(*provider.RelayerTxResponse, error), +) error { + // Guard against account sequence number mismatch errors by locking for the specific wallet for + // the account sequence query all the way through the transaction broadcast success/fail. + cc.txMu.Lock() + defer cc.txMu.Unlock() + + txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo) + if err != nil { + // Account sequence mismatch errors can happen on the simulated transaction also. + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + } + + return err + } + + if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback); err != nil { + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + } + + return err + } + + // we had a successful tx broadcast with this sequence, so update it to the next + cc.updateNextAccountSequence(sequence + 1) + + return nil +} + +// sdkError will return the Cosmos SDK registered error for a given codespace/code combo if registered, otherwise nil. +func (cc *CosmosProvider) sdkError(codespace string, code uint32) error { + // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace + // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go + err := errors.Unwrap(sdkerrors.ABCIError(codespace, code, "error broadcasting transaction")) + if err.Error() != errUnknown { + return err + } + return nil +} + +// broadcastTx broadcasts a transaction with the given raw bytes and then, in an async goroutine, waits for the tx to be included in the block. +// The wait will end after either the asyncTimeout has run out or the asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) broadcastTx( + ctx context.Context, // context for tx broadcast + tx []byte, // raw tx to be broadcasted + msgs []provider.RelayerMessage, // used for logging only + fees sdk.Coins, // used for metrics + + asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast + asyncTimeout time.Duration, // timeout for waiting for block inclusion + asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion +) error { + res, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx) + if err != nil { + if res == nil { + // There are some cases where BroadcastTxSync will return an error but the associated + // ResultBroadcastTx will be nil. + return err + } + rlyResp := &provider.RelayerTxResponse{ + TxHash: res.Hash.String(), + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data.String(), + } + cc.LogFailedTx(rlyResp, err, msgs) + return err + } + + cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) + + // TODO: maybe we need to check if the node has tx indexing enabled? + // if not, we need to find a new way to block until inclusion in a block + + go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallback) + + return nil +} + +// waitForTx waits for a transaction to be included in a block, logs success/fail, then invokes callback. +// This is intended to be called as an async goroutine. +func (cc *CosmosProvider) waitForTx( + ctx context.Context, + txHash []byte, + msgs []provider.RelayerMessage, // used for logging only + waitTimeout time.Duration, + callback func(*provider.RelayerTxResponse, error), +) { + res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout) + if err != nil { + cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) + if callback != nil { + callback(nil, err) + } + return + } + + rlyResp := &provider.RelayerTxResponse{ + Height: res.Height, + TxHash: res.TxHash, + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data, + Events: parseEventsFromTxResponse(res), + } + + // transaction was executed, log the success or failure using the tx response code + // NOTE: error is nil, logic should use the returned error to determine if the + // transaction was successfully executed. + + if res.Code != 0 { + // Check for any registered SDK errors + err := cc.sdkError(res.Codespace, res.Code) + if err == nil { + err = fmt.Errorf("transaction failed to execute") + } + if callback != nil { + callback(nil, err) + } + cc.LogFailedTx(rlyResp, nil, msgs) + return + } + + if callback != nil { + callback(rlyResp, nil) + } + cc.LogSuccessTx(res, msgs) +} + +// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation. +func (cc *CosmosProvider) waitForBlockInclusion( + ctx context.Context, + txHash []byte, + waitTimeout time.Duration, +) (*sdk.TxResponse, error) { + exitAfter := time.After(waitTimeout) + for { + select { + case <-exitAfter: + return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, lensclient.ErrTimeoutAfterWaitingForTxBroadcast) + // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. + case <-time.After(time.Millisecond * 100): + res, err := cc.ChainClient.RPCClient.Tx(ctx, txHash, false) + if err == nil { + return cc.mkTxResult(res) + } + if strings.Contains(err.Error(), "transaction indexing is disabled") { + return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// mkTxResult decodes a tendermint transaction into an SDK TxResponse. +func (cc *CosmosProvider) mkTxResult(resTx *coretypes.ResultTx) (*sdk.TxResponse, error) { + txbz, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx) + if err != nil { + return nil, err + } + p, ok := txbz.(intoAny) + if !ok { + return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz) + } + any := p.AsAny() + return sdk.NewResponseResultTx(resTx, any, ""), nil +} + +type intoAny interface { + AsAny() *codectypes.Any +} + func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent { var events []provider.RelayerEvent @@ -243,6 +434,13 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel txf = txf.WithMemo(memo) } + sequence := txf.Sequence() + cc.updateNextAccountSequence(sequence) + if sequence < cc.nextAccountSeq { + sequence = cc.nextAccountSeq + txf = txf.WithSequence(sequence) + } + // TODO: Make this work with new CalculateGas method // TODO: This is related to GRPC client stuff? // https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297 @@ -296,7 +494,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel return nil, 0, sdk.Coins{}, err } - return txBytes, txf.Sequence(), fees, nil + return txBytes, sequence, fees, nil } // handleAccountSequenceMismatchError will parse the error string, e.g.: diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go new file mode 100644 index 000000000..3c5fcc49b --- /dev/null +++ b/relayer/processor/message_processor.go @@ -0,0 +1,457 @@ +package processor + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + "time" + + chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// messageProcessor is used for concurrent IBC message assembly and sending +type messageProcessor struct { + log *zap.Logger + metrics *PrometheusMetrics + + memo string + + msgUpdateClient provider.RelayerMessage + clientUpdateThresholdTime time.Duration + + pktMsgs []packetMessageToTrack + connMsgs []connectionMessageToTrack + chanMsgs []channelMessageToTrack + clientICQMsgs []clientICQMessageToTrack +} + +// trackMessage stores the message tracker in the correct slice and index based on the type. +func (mp *messageProcessor) trackMessage(tracker messageToTrack, i int) { + switch t := tracker.(type) { + case packetMessageToTrack: + mp.pktMsgs[i] = t + case channelMessageToTrack: + mp.chanMsgs[i] = t + case connectionMessageToTrack: + mp.connMsgs[i] = t + case clientICQMessageToTrack: + mp.clientICQMsgs[i] = t + } +} + +// trackers returns all of the msg trackers for the current set of messages to be sent. +func (mp *messageProcessor) trackers() (trackers []messageToTrack) { + for _, t := range mp.pktMsgs { + trackers = append(trackers, t) + } + for _, t := range mp.chanMsgs { + trackers = append(trackers, t) + } + for _, t := range mp.connMsgs { + trackers = append(trackers, t) + } + for _, t := range mp.clientICQMsgs { + trackers = append(trackers, t) + } + return trackers +} + +func newMessageProcessor( + log *zap.Logger, + metrics *PrometheusMetrics, + memo string, + clientUpdateThresholdTime time.Duration, +) *messageProcessor { + return &messageProcessor{ + log: log, + metrics: metrics, + memo: memo, + clientUpdateThresholdTime: clientUpdateThresholdTime, + } +} + +// processMessages is the entrypoint for the message processor. +// it will assemble and send any pending messages. +func (mp *messageProcessor) processMessages( + ctx context.Context, + messages pathEndMessages, + src, dst *pathEndRuntime, +) error { + needsClientUpdate, err := mp.shouldUpdateClientNow(ctx, src, dst) + if err != nil { + return err + } + + if err := mp.assembleMsgUpdateClient(ctx, src, dst); err != nil { + return err + } + + mp.assembleMessages(ctx, messages, src, dst) + + return mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate) +} + +// shouldUpdateClientNow determines if an update client message should be sent +// even if there are no messages to be sent now. It will not be attempted if +// there has not been enough blocks since the last client update attempt. +// Otherwise, it will be attempted if either 2/3 of the trusting period +// or the configured client update threshold duration has passed. +func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst *pathEndRuntime) (bool, error) { + var consensusHeightTime time.Time + if dst.clientState.ConsensusTime.IsZero() { + h, err := src.chainProvider.QueryIBCHeader(ctx, int64(dst.clientState.ConsensusHeight.RevisionHeight)) + if err != nil { + return false, fmt.Errorf("failed to get header height: %w", err) + } + consensusHeightTime = time.Unix(0, int64(h.ConsensusState().GetTimestamp())) + } else { + consensusHeightTime = dst.clientState.ConsensusTime + } + + clientUpdateThresholdMs := mp.clientUpdateThresholdTime.Milliseconds() + + dst.lastClientUpdateHeightMu.Lock() + enoughBlocksPassed := (dst.latestBlock.Height - blocksToRetrySendAfter) > dst.lastClientUpdateHeight + dst.lastClientUpdateHeightMu.Unlock() + + twoThirdsTrustingPeriodMs := float64(dst.clientState.TrustingPeriod.Milliseconds()) * 2 / 3 + timeSinceLastClientUpdateMs := float64(time.Since(consensusHeightTime).Milliseconds()) + + pastTwoThirdsTrustingPeriod := timeSinceLastClientUpdateMs > twoThirdsTrustingPeriodMs + + pastConfiguredClientUpdateThreshold := clientUpdateThresholdMs > 0 && + time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs + + shouldUpdateClientNow := enoughBlocksPassed && (pastTwoThirdsTrustingPeriod || pastConfiguredClientUpdateThreshold) + + if shouldUpdateClientNow { + mp.log.Info("Client update threshold condition met", + zap.String("chain_id", dst.info.ChainID), + zap.String("client_id", dst.info.ClientID), + zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()), + zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()), + zap.Int64("client_threshold_time", mp.clientUpdateThresholdTime.Milliseconds()), + ) + } + + return shouldUpdateClientNow, nil +} + +// assembleMessages will assemble all messages in parallel. This typically involves proof queries for each. +func (mp *messageProcessor) assembleMessages(ctx context.Context, messages pathEndMessages, src, dst *pathEndRuntime) { + var wg sync.WaitGroup + + mp.connMsgs = make([]connectionMessageToTrack, len(messages.connectionMessages)) + for i, msg := range messages.connectionMessages { + wg.Add(1) + go mp.assembleMessage(ctx, msg, src, dst, i, &wg) + } + + mp.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) + for i, msg := range messages.channelMessages { + wg.Add(1) + go mp.assembleMessage(ctx, msg, src, dst, i, &wg) + } + + mp.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) + for i, msg := range messages.clientICQMessages { + wg.Add(1) + go mp.assembleMessage(ctx, msg, src, dst, i, &wg) + } + + mp.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) + for i, msg := range messages.packetMessages { + wg.Add(1) + go mp.assembleMessage(ctx, msg, src, dst, i, &wg) + } + + wg.Wait() +} + +// assembledCount will return the number of assembled messages. +// This must be called after assembleMessages has completed. +func (mp *messageProcessor) assembledCount() int { + count := 0 + for _, m := range mp.trackers() { + if m.assembledMsg() != nil { + count++ + } + } + + return count +} + +// assembleMessage will assemble a specific message based on it's type. +func (mp *messageProcessor) assembleMessage( + ctx context.Context, + msg ibcMessage, + src, dst *pathEndRuntime, + i int, + wg *sync.WaitGroup, +) { + assembled, err := msg.assemble(ctx, src, dst) + mp.trackMessage(msg.tracker(assembled), i) + wg.Done() + if err != nil { + dst.log.Error(fmt.Sprintf("Error assembling %s message", msg.msgType()), zap.Object("msg", msg)) + return + } + dst.log.Debug(fmt.Sprintf("Assembled %s message", msg.msgType()), zap.Object("msg", msg)) +} + +// assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header +// from the source and then assemble the update client message in the correct format for the destination. +func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime) error { + clientID := dst.info.ClientID + clientConsensusHeight := dst.clientState.ConsensusHeight + trustedConsensusHeight := dst.clientTrustedState.ClientState.ConsensusHeight + var trustedNextValidatorsHash []byte + if dst.clientTrustedState.IBCHeader != nil { + trustedNextValidatorsHash = dst.clientTrustedState.IBCHeader.NextValidatorsHash() + } + + // If the client state height is not equal to the client trusted state height and the client state height is + // the latest block, we cannot send a MsgUpdateClient until another block is observed on the counterparty. + // If the client state height is in the past, beyond ibcHeadersToCache, then we need to query for it. + if !trustedConsensusHeight.EQ(clientConsensusHeight) { + deltaConsensusHeight := int64(clientConsensusHeight.RevisionHeight) - int64(trustedConsensusHeight.RevisionHeight) + if trustedConsensusHeight.RevisionHeight != 0 && deltaConsensusHeight <= clientConsensusHeightUpdateThresholdBlocks { + return fmt.Errorf("observed client trusted height: %d does not equal latest client state height: %d", + trustedConsensusHeight.RevisionHeight, clientConsensusHeight.RevisionHeight) + } + header, err := src.chainProvider.QueryIBCHeader(ctx, int64(clientConsensusHeight.RevisionHeight+1)) + if err != nil { + return fmt.Errorf("error getting IBC header at height: %d for chain_id: %s, %w", + clientConsensusHeight.RevisionHeight+1, src.info.ChainID, err) + } + mp.log.Debug("Had to query for client trusted IBC header", + zap.String("chain_id", src.info.ChainID), + zap.String("counterparty_chain_id", dst.info.ChainID), + zap.String("counterparty_client_id", clientID), + zap.Uint64("height", clientConsensusHeight.RevisionHeight+1), + zap.Uint64("latest_height", src.latestBlock.Height), + ) + dst.clientTrustedState = provider.ClientTrustedState{ + ClientState: dst.clientState, + IBCHeader: header, + } + trustedConsensusHeight = clientConsensusHeight + trustedNextValidatorsHash = header.NextValidatorsHash() + } + + if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight && + !bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) { + return fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ + "need to wait for next block's header before we can assemble and send a new MsgUpdateClient", + trustedConsensusHeight.RevisionHeight) + } + + msgUpdateClientHeader, err := src.chainProvider.MsgUpdateClientHeader( + src.latestHeader, + trustedConsensusHeight, + dst.clientTrustedState.IBCHeader, + ) + if err != nil { + return fmt.Errorf("error assembling new client header: %w", err) + } + + msgUpdateClient, err := dst.chainProvider.MsgUpdateClient(clientID, msgUpdateClientHeader) + if err != nil { + return fmt.Errorf("error assembling MsgUpdateClient: %w", err) + } + + mp.msgUpdateClient = msgUpdateClient + + return nil +} + +// trackAndSendMessages will increment attempt counters for each message and send each message. +// Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error +// in a previous batch. +func (mp *messageProcessor) trackAndSendMessages( + ctx context.Context, + src, dst *pathEndRuntime, + needsClientUpdate bool, +) error { + broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + var batch []messageToTrack + + for _, t := range mp.trackers() { + retries := dst.trackProcessingMessage(t) + if t.assembledMsg() == nil { + continue + } + if broadcastBatch && retries == 0 { + batch = append(batch, t) + continue + } + go mp.sendSingleMessage(ctx, src, dst, t) + } + + if len(batch) > 0 { + go mp.sendBatchMessages(ctx, src, dst, batch) + } + + if mp.assembledCount() > 0 { + return nil + } + + if needsClientUpdate { + go mp.sendClientUpdate(ctx, src, dst) + return nil + } + + // only msgUpdateClient, don't need to send + return errors.New("all messages failed to assemble") +} + +// sendClientUpdate will send an isolated client update message. +func (mp *messageProcessor) sendClientUpdate( + ctx context.Context, + src, dst *pathEndRuntime, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + dst.log.Debug("Will relay client update") + + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = dst.latestBlock.Height + dst.lastClientUpdateHeightMu.Unlock() + + msgs := []provider.RelayerMessage{mp.msgUpdateClient} + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil { + mp.log.Error("Error sending client update message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + return + } + dst.log.Debug("Client update broadcast completed") +} + +// sendBatchMessages will send a batch of messages, +// then increment metrics counters for successful packet messages. +func (mp *messageProcessor) sendBatchMessages( + ctx context.Context, + src, dst *pathEndRuntime, + batch []messageToTrack, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + // messages are batch with appended MsgUpdateClient + msgs := make([]provider.RelayerMessage, 1+len(batch)) + msgs[0] = mp.msgUpdateClient + fields := []zapcore.Field{} + for i, t := range batch { + msgs[i+1] = t.assembledMsg() + fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + } + + dst.log.Debug("Will relay messages", fields...) + + callback := func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || mp.metrics == nil { + return + } + for _, tracker := range batch { + t, ok := tracker.(packetMessageToTrack) + if !ok { + continue + } + var channel, port string + if t.msg.eventType == chantypes.EventTypeRecvPacket { + channel = t.msg.info.DestChannel + port = t.msg.info.DestPort + } else { + channel = t.msg.info.SourceChannel + port = t.msg.info.SourcePort + } + mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) + } + } + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callback); err != nil { + errFields := []zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + } + if errors.Is(err, chantypes.ErrRedundantTx) { + mp.log.Debug("Redundant message(s)", errFields...) + return + } + mp.log.Error("Error sending messages", errFields...) + return + } + dst.log.Debug("Message broadcast completed", fields...) +} + +// sendSingleMessage will send an isolated message. +func (mp *messageProcessor) sendSingleMessage( + ctx context.Context, + src, dst *pathEndRuntime, + tracker messageToTrack, +) { + msgs := []provider.RelayerMessage{mp.msgUpdateClient, tracker.assembledMsg()} + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + msgType := tracker.msgType() + + dst.log.Debug(fmt.Sprintf("Will broadcast %s message", msgType), zap.Object("msg", tracker)) + + // Set callback for packet messages so that we increment prometheus metrics on successful relays. + var callback func(rtr *provider.RelayerTxResponse, err error) + if t, ok := tracker.(packetMessageToTrack); ok { + callback = func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || mp.metrics == nil { + return + } + var channel, port string + if t.msg.eventType == chantypes.EventTypeRecvPacket { + channel = t.msg.info.DestChannel + port = t.msg.info.DestPort + } else { + channel = t.msg.info.SourceChannel + port = t.msg.info.SourcePort + } + mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) + } + } + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callback) + if err != nil { + errFields := []zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + } + errFields = append(errFields, zap.Object("msg", tracker)) + errFields = append(errFields, zap.Error(err)) + if errors.Is(err, chantypes.ErrRedundantTx) { + mp.log.Debug(fmt.Sprintf("Redundant %s message", msgType), errFields...) + return + } + mp.log.Error(fmt.Sprintf("Error broadcasting %s message", msgType), errFields...) + return + } + + dst.log.Debug(fmt.Sprintf("Successfully broadcasted %s message", msgType), zap.Object("msg", tracker)) +} diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 942576dc9..ed7b77aca 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -2,6 +2,7 @@ package processor import ( "context" + "sync" "time" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" @@ -46,6 +47,9 @@ type pathEndRuntime struct { // inSync indicates whether queries are in sync with latest height of the chain. inSync bool + lastClientUpdateHeight uint64 + lastClientUpdateHeightMu sync.Mutex + metrics *PrometheusMetrics } @@ -287,8 +291,11 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache } func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) { - pathEnd.inSync = d.InSync + pathEnd.lastClientUpdateHeightMu.Lock() pathEnd.latestBlock = d.LatestBlock + pathEnd.lastClientUpdateHeightMu.Unlock() + + pathEnd.inSync = d.InSync pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState if d.ClientState.ConsensusHeight != pathEnd.clientState.ConsensusHeight { @@ -632,99 +639,92 @@ func (pathEnd *pathEndRuntime) shouldSendClientICQMessage(message provider.Clien return true } -func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) { - eventType := t.msg.eventType - sequence := t.msg.info.Sequence - channelKey, err := t.msg.channelKey() - if err != nil { - pathEnd.log.Error("Unexpected error tracking processing packet", - zap.Inline(channelKey), - zap.String("event_type", eventType), - zap.Uint64("sequence", sequence), - zap.Error(err), - ) - return - } - msgProcessCache, ok := pathEnd.packetProcessing[channelKey] - if !ok { - msgProcessCache = make(packetChannelMessageCache) - pathEnd.packetProcessing[channelKey] = msgProcessCache - } - channelProcessingCache, ok := msgProcessCache[eventType] - if !ok { - channelProcessingCache = make(packetMessageSendCache) - msgProcessCache[eventType] = channelProcessingCache - } - +func (pathEnd *pathEndRuntime) trackProcessingMessage(tracker messageToTrack) uint64 { retryCount := uint64(0) - if inProgress, ok := channelProcessingCache[sequence]; ok { - retryCount = inProgress.retryCount + 1 - } - - channelProcessingCache[sequence] = processingMessage{ - lastProcessedHeight: pathEnd.latestBlock.Height, - retryCount: retryCount, - assembled: t.assembled, - } -} - -func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMessageToTrack) { - eventType := t.msg.eventType - connectionKey := connectionInfoConnectionKey(t.msg.info).Counterparty() - msgProcessCache, ok := pathEnd.connProcessing[eventType] - if !ok { - msgProcessCache = make(connectionKeySendCache) - pathEnd.connProcessing[eventType] = msgProcessCache - } - - retryCount := uint64(0) - - if inProgress, ok := msgProcessCache[connectionKey]; ok { - retryCount = inProgress.retryCount + 1 - } + switch t := tracker.(type) { + case packetMessageToTrack: + eventType := t.msg.eventType + sequence := t.msg.info.Sequence + channelKey, err := t.msg.channelKey() + if err != nil { + pathEnd.log.Error("Unexpected error tracking processing packet", + zap.Inline(channelKey), + zap.String("event_type", eventType), + zap.Uint64("sequence", sequence), + zap.Error(err), + ) + return 0 + } + msgProcessCache, ok := pathEnd.packetProcessing[channelKey] + if !ok { + msgProcessCache = make(packetChannelMessageCache) + pathEnd.packetProcessing[channelKey] = msgProcessCache + } + channelProcessingCache, ok := msgProcessCache[eventType] + if !ok { + channelProcessingCache = make(packetMessageSendCache) + msgProcessCache[eventType] = channelProcessingCache + } - msgProcessCache[connectionKey] = processingMessage{ - lastProcessedHeight: pathEnd.latestBlock.Height, - retryCount: retryCount, - assembled: t.assembled, - } -} + if inProgress, ok := channelProcessingCache[sequence]; ok { + retryCount = inProgress.retryCount + 1 + } -func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToTrack) { - eventType := t.msg.eventType - channelKey := channelInfoChannelKey(t.msg.info).Counterparty() - msgProcessCache, ok := pathEnd.channelProcessing[eventType] - if !ok { - msgProcessCache = make(channelKeySendCache) - pathEnd.channelProcessing[eventType] = msgProcessCache - } + channelProcessingCache[sequence] = processingMessage{ + lastProcessedHeight: pathEnd.latestBlock.Height, + retryCount: retryCount, + assembled: t.assembled != nil, + } + case channelMessageToTrack: + eventType := t.msg.eventType + channelKey := channelInfoChannelKey(t.msg.info).Counterparty() + msgProcessCache, ok := pathEnd.channelProcessing[eventType] + if !ok { + msgProcessCache = make(channelKeySendCache) + pathEnd.channelProcessing[eventType] = msgProcessCache + } - retryCount := uint64(0) + if inProgress, ok := msgProcessCache[channelKey]; ok { + retryCount = inProgress.retryCount + 1 + } - if inProgress, ok := msgProcessCache[channelKey]; ok { - retryCount = inProgress.retryCount + 1 - } + msgProcessCache[channelKey] = processingMessage{ + lastProcessedHeight: pathEnd.latestBlock.Height, + retryCount: retryCount, + assembled: t.assembled != nil, + } + case connectionMessageToTrack: + eventType := t.msg.eventType + connectionKey := connectionInfoConnectionKey(t.msg.info).Counterparty() + msgProcessCache, ok := pathEnd.connProcessing[eventType] + if !ok { + msgProcessCache = make(connectionKeySendCache) + pathEnd.connProcessing[eventType] = msgProcessCache + } - msgProcessCache[channelKey] = processingMessage{ - lastProcessedHeight: pathEnd.latestBlock.Height, - retryCount: retryCount, - assembled: t.assembled, - } -} + if inProgress, ok := msgProcessCache[connectionKey]; ok { + retryCount = inProgress.retryCount + 1 + } -func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) { - retryCount := uint64(0) + msgProcessCache[connectionKey] = processingMessage{ + lastProcessedHeight: pathEnd.latestBlock.Height, + retryCount: retryCount, + assembled: t.assembled != nil, + } + case clientICQMessageToTrack: + queryID := t.msg.info.QueryID - queryID := t.msg.info.QueryID + if inProgress, ok := pathEnd.clientICQProcessing[queryID]; ok { + retryCount = inProgress.retryCount + 1 + } - if inProgress, ok := pathEnd.clientICQProcessing[queryID]; ok { - retryCount = inProgress.retryCount + 1 + pathEnd.clientICQProcessing[queryID] = processingMessage{ + lastProcessedHeight: pathEnd.latestBlock.Height, + retryCount: retryCount, + assembled: t.assembled != nil, + } } - pathEnd.clientICQProcessing[queryID] = processingMessage{ - lastProcessedHeight: pathEnd.latestBlock.Height, - retryCount: retryCount, - assembled: t.assembled, - } + return retryCount } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index c443f2004..1c0d52879 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1,12 +1,10 @@ package processor import ( + "bytes" "context" "errors" - "fmt" "sort" - "sync" - "time" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" @@ -15,26 +13,6 @@ import ( "golang.org/x/sync/errgroup" ) -// assembleIBCMessage constructs the applicable IBC message using the requested function. -// These functions may do things like make queries in order to assemble a complete IBC message. -func (pp *PathProcessor) assemblePacketIBCMessage( - ctx context.Context, - src, dst *pathEndRuntime, - partialMessage packetIBCMessage, - assembleMessage func(ctx context.Context, msgRecvPacket provider.PacketInfo, signer string, latest provider.LatestBlock) (provider.RelayerMessage, error), -) (provider.RelayerMessage, error) { - signer, err := dst.chainProvider.Address() - if err != nil { - return nil, fmt.Errorf("error getting signer address for {%s}: %w", dst.info.ChainID, err) - } - assembled, err := assembleMessage(ctx, partialMessage.info, signer, src.latestBlock) - if err != nil { - return nil, fmt.Errorf("error assembling %s for {%s}: %w", partialMessage.eventType, dst.info.ChainID, err) - } - - return assembled, nil -} - // getMessagesToSend returns only the lowest sequence message (if it should be sent) for ordered channels, // otherwise returns all which should be sent. func (pp *PathProcessor) getMessagesToSend( @@ -422,59 +400,6 @@ ClientICQLoop: return res } -// assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header -// from the source and then assemble the update client message in the correct format for the destination. -func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime) (provider.RelayerMessage, error) { - clientID := dst.info.ClientID - clientConsensusHeight := dst.clientState.ConsensusHeight - trustedConsensusHeight := dst.clientTrustedState.ClientState.ConsensusHeight - - // If the client state height is not equal to the client trusted state height and the client state height is - // the latest block, we cannot send a MsgUpdateClient until another block is observed on the counterparty. - // If the client state height is in the past, beyond ibcHeadersToCache, then we need to query for it. - if !trustedConsensusHeight.EQ(clientConsensusHeight) { - deltaConsensusHeight := int64(clientConsensusHeight.RevisionHeight) - int64(trustedConsensusHeight.RevisionHeight) - if trustedConsensusHeight.RevisionHeight != 0 && deltaConsensusHeight <= clientConsensusHeightUpdateThresholdBlocks { - return nil, fmt.Errorf("observed client trusted height: %d does not equal latest client state height: %d", - trustedConsensusHeight.RevisionHeight, clientConsensusHeight.RevisionHeight) - } - header, err := src.chainProvider.QueryIBCHeader(ctx, int64(clientConsensusHeight.RevisionHeight+1)) - if err != nil { - return nil, fmt.Errorf("error getting IBC header at height: %d for chain_id: %s, %w", clientConsensusHeight.RevisionHeight+1, src.info.ChainID, err) - } - pp.log.Debug("Had to query for client trusted IBC header", - zap.String("chain_id", src.info.ChainID), - zap.String("counterparty_chain_id", dst.info.ChainID), - zap.String("counterparty_client_id", clientID), - zap.Uint64("height", clientConsensusHeight.RevisionHeight+1), - zap.Uint64("latest_height", src.latestBlock.Height), - ) - dst.clientTrustedState = provider.ClientTrustedState{ - ClientState: dst.clientState, - IBCHeader: header, - } - trustedConsensusHeight = clientConsensusHeight - } - - if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight { - return nil, fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ - "need to wait for next block's header before we can assemble and send a new MsgUpdateClient", - trustedConsensusHeight.RevisionHeight) - } - - msgUpdateClientHeader, err := src.chainProvider.MsgUpdateClientHeader(src.latestHeader, trustedConsensusHeight, dst.clientTrustedState.IBCHeader) - if err != nil { - return nil, fmt.Errorf("error assembling new client header: %w", err) - } - - msgUpdateClient, err := dst.chainProvider.MsgUpdateClient(clientID, msgUpdateClientHeader) - if err != nil { - return nil, fmt.Errorf("error assembling MsgUpdateClient: %w", err) - } - - return msgUpdateClient, nil -} - // updateClientTrustedState combines the counterparty chains trusted IBC header // with the latest client state, which will be used for constructing MsgUpdateClient messages. func (pp *PathProcessor) updateClientTrustedState(src *pathEndRuntime, dst *pathEndRuntime) { @@ -485,12 +410,22 @@ func (pp *PathProcessor) updateClientTrustedState(src *pathEndRuntime, dst *path // need to assemble new trusted state ibcHeader, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight+1] if !ok { + if ibcHeaderCurrent, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight]; ok && + dst.clientTrustedState.IBCHeader != nil && + bytes.Equal(dst.clientTrustedState.IBCHeader.NextValidatorsHash(), ibcHeaderCurrent.NextValidatorsHash()) { + src.clientTrustedState = provider.ClientTrustedState{ + ClientState: src.clientState, + IBCHeader: ibcHeaderCurrent, + } + return + } pp.log.Debug("No cached IBC header for client trusted height", zap.String("chain_id", src.info.ChainID), zap.String("client_id", src.info.ClientID), zap.Uint64("height", src.clientState.ConsensusHeight.RevisionHeight+1), ) return + } src.clientTrustedState = provider.ClientTrustedState{ ClientState: src.clientState, @@ -718,390 +653,16 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, messageLifec // if sending messages fails to one pathEnd, we don't need to halt sending to the other pathEnd. var eg errgroup.Group eg.Go(func() error { - return pp.assembleAndSendMessages(ctx, pp.pathEnd2, pp.pathEnd1, pathEnd1Messages) + mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime) + return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1) }) eg.Go(func() error { - return pp.assembleAndSendMessages(ctx, pp.pathEnd1, pp.pathEnd2, pathEnd2Messages) + mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime) + return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2) }) return eg.Wait() } -func (pp *PathProcessor) assembleMessage( - ctx context.Context, - msg ibcMessage, - src, dst *pathEndRuntime, - om *outgoingMessages, - i int, - wg *sync.WaitGroup, -) { - defer wg.Done() - var message provider.RelayerMessage - var err error - switch m := msg.(type) { - case packetIBCMessage: - message, err = pp.assemblePacketMessage(ctx, m, src, dst) - om.pktMsgs[i] = packetMessageToTrack{ - msg: m, - assembled: err == nil, - } - if err == nil { - dst.log.Debug("Will send packet message", - zap.String("event_type", m.eventType), - zap.Uint64("sequence", m.info.Sequence), - zap.String("src_channel", m.info.SourceChannel), - zap.String("src_port", m.info.SourcePort), - zap.String("dst_channel", m.info.DestChannel), - zap.String("dst_port", m.info.DestPort), - ) - } - case connectionIBCMessage: - message, err = pp.assembleConnectionMessage(ctx, m, src, dst) - om.connMsgs[i] = connectionMessageToTrack{ - msg: m, - assembled: err == nil, - } - if err == nil { - dst.log.Debug("Will send connection message", - zap.String("event_type", m.eventType), - zap.String("connection_id", m.info.ConnID), - ) - } - case channelIBCMessage: - message, err = pp.assembleChannelMessage(ctx, m, src, dst) - om.chanMsgs[i] = channelMessageToTrack{ - msg: m, - assembled: err == nil, - } - if err == nil { - dst.log.Debug("Will send channel message", - zap.String("event_type", m.eventType), - zap.String("channel_id", m.info.ChannelID), - zap.String("port_id", m.info.PortID), - ) - } - case clientICQMessage: - message, err = pp.assembleClientICQMessage(ctx, m, src, dst) - om.clientICQMsgs[i] = clientICQMessageToTrack{ - msg: m, - assembled: err == nil, - } - if err == nil { - dst.log.Debug("Will send ICQ message", - zap.String("type", m.info.Type), - zap.String("query_id", string(m.info.QueryID)), - ) - } - } - if err != nil { - pp.log.Error("Error assembling channel message", zap.Error(err)) - return - } - om.Append(message) -} - -func (pp *PathProcessor) assembleAndSendMessages( - ctx context.Context, - src, dst *pathEndRuntime, - messages pathEndMessages, -) error { - var needsClientUpdate bool - if len(messages.packetMessages) == 0 && len(messages.connectionMessages) == 0 && len(messages.channelMessages) == 0 && len(messages.clientICQMessages) == 0 { - var consensusHeightTime time.Time - if dst.clientState.ConsensusTime.IsZero() { - h, err := src.chainProvider.QueryIBCHeader(ctx, int64(dst.clientState.ConsensusHeight.RevisionHeight)) - if err != nil { - return fmt.Errorf("failed to get header height: %w", err) - } - consensusHeightTime = time.Unix(0, int64(h.ConsensusState().GetTimestamp())) - } else { - consensusHeightTime = dst.clientState.ConsensusTime - } - clientUpdateThresholdMs := pp.clientUpdateThresholdTime.Milliseconds() - if (float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || - (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs) { - needsClientUpdate = true - pp.log.Info("Client close to expiration", - zap.String("chain_id:", dst.info.ChainID), - zap.String("client_id:", dst.info.ClientID), - zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()), - zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()), - zap.Int64("client_threshold_time", pp.clientUpdateThresholdTime.Milliseconds()), - ) - } else { - return nil - } - } - om := outgoingMessages{ - msgs: make( - []provider.RelayerMessage, - 0, - len(messages.packetMessages)+len(messages.connectionMessages)+len(messages.channelMessages)+len(messages.clientICQMessages), - ), - } - msgUpdateClient, err := pp.assembleMsgUpdateClient(ctx, src, dst) - if err != nil { - return err - } - om.Append(msgUpdateClient) - - // Each assembleMessage call below will make a query on the source chain, so these operations can run in parallel. - var wg sync.WaitGroup - - // connection messages are highest priority - om.connMsgs = make([]connectionMessageToTrack, len(messages.connectionMessages)) - for i, msg := range messages.connectionMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } - - wg.Wait() - - if len(om.msgs) == 1 { - om.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) - // only assemble and send channel handshake messages if there are no conn handshake messages - // this prioritizes connection handshake messages, useful if a connection handshake needs to occur before a channel handshake - for i, msg := range messages.channelMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } - - wg.Wait() - } - - if len(om.msgs) == 1 { - om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) - // only assemble and send ICQ messages if there are no conn or chan handshake messages - for i, msg := range messages.clientICQMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } - - wg.Wait() - } - - if len(om.msgs) == 1 { - om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) - // only assemble and send packet messages if there are no handshake messages - for i, msg := range messages.packetMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } - - wg.Wait() - } - - if len(om.msgs) == 1 && !needsClientUpdate { - // only msgUpdateClient, don't need to send - return errors.New("all messages failed to assemble") - } - - for _, m := range om.connMsgs { - dst.trackProcessingConnectionMessage(m) - } - - for _, m := range om.chanMsgs { - dst.trackProcessingChannelMessage(m) - } - - for _, m := range om.clientICQMsgs { - dst.trackProcessingClientICQMessage(m) - } - - for _, m := range om.pktMsgs { - dst.trackProcessingPacketMessage(m) - } - - go pp.sendMessages(ctx, src, dst, &om, pp.memo) - - return nil -} - -func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om *outgoingMessages, memo string) { - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) - defer cancel() - - _, txSuccess, err := dst.chainProvider.SendMessages(ctx, om.msgs, pp.memo) - if err != nil { - if errors.Is(err, chantypes.ErrRedundantTx) { - pp.log.Debug("Packet(s) already handled by another relayer", - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), - zap.Error(err), - ) - return - } - pp.log.Error("Error sending messages", - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), - zap.Error(err), - ) - return - } - if !txSuccess { - dst.log.Error("Error sending messages, transaction was not successful") - return - } - - if pp.metrics == nil { - return - } - for _, m := range om.pktMsgs { - var channel, port string - if m.msg.eventType == chantypes.EventTypeRecvPacket { - channel = m.msg.info.DestChannel - port = m.msg.info.DestPort - } else { - channel = m.msg.info.SourceChannel - port = m.msg.info.SourcePort - } - pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, m.msg.eventType) - } -} - -func (pp *PathProcessor) assemblePacketMessage( - ctx context.Context, - msg packetIBCMessage, - src, dst *pathEndRuntime, -) (provider.RelayerMessage, error) { - var packetProof func(context.Context, provider.PacketInfo, uint64) (provider.PacketProof, error) - var assembleMessage func(provider.PacketInfo, provider.PacketProof) (provider.RelayerMessage, error) - switch msg.eventType { - case chantypes.EventTypeRecvPacket: - packetProof = src.chainProvider.PacketCommitment - assembleMessage = dst.chainProvider.MsgRecvPacket - case chantypes.EventTypeAcknowledgePacket: - packetProof = src.chainProvider.PacketAcknowledgement - assembleMessage = dst.chainProvider.MsgAcknowledgement - case chantypes.EventTypeTimeoutPacket: - if msg.info.ChannelOrder == chantypes.ORDERED.String() { - packetProof = src.chainProvider.NextSeqRecv - } else { - packetProof = src.chainProvider.PacketReceipt - } - - assembleMessage = dst.chainProvider.MsgTimeout - case chantypes.EventTypeTimeoutPacketOnClose: - if msg.info.ChannelOrder == chantypes.ORDERED.String() { - packetProof = src.chainProvider.NextSeqRecv - } else { - packetProof = src.chainProvider.PacketReceipt - } - - assembleMessage = dst.chainProvider.MsgTimeoutOnClose - default: - return nil, fmt.Errorf("unexepected packet message eventType for message assembly: %s", msg.eventType) - } - - ctx, cancel := context.WithTimeout(ctx, packetProofQueryTimeout) - defer cancel() - - var proof provider.PacketProof - var err error - proof, err = packetProof(ctx, msg.info, src.latestBlock.Height) - if err != nil { - return nil, fmt.Errorf("error querying packet proof: %w", err) - } - return assembleMessage(msg.info, proof) -} - -func (pp *PathProcessor) assembleConnectionMessage( - ctx context.Context, - msg connectionIBCMessage, - src, dst *pathEndRuntime, -) (provider.RelayerMessage, error) { - var connProof func(context.Context, provider.ConnectionInfo, uint64) (provider.ConnectionProof, error) - var assembleMessage func(provider.ConnectionInfo, provider.ConnectionProof) (provider.RelayerMessage, error) - switch msg.eventType { - case conntypes.EventTypeConnectionOpenInit: - // don't need proof for this message - msg.info.CounterpartyCommitmentPrefix = src.chainProvider.CommitmentPrefix() - assembleMessage = dst.chainProvider.MsgConnectionOpenInit - case conntypes.EventTypeConnectionOpenTry: - msg.info.CounterpartyCommitmentPrefix = src.chainProvider.CommitmentPrefix() - connProof = src.chainProvider.ConnectionHandshakeProof - assembleMessage = dst.chainProvider.MsgConnectionOpenTry - case conntypes.EventTypeConnectionOpenAck: - connProof = src.chainProvider.ConnectionHandshakeProof - assembleMessage = dst.chainProvider.MsgConnectionOpenAck - case conntypes.EventTypeConnectionOpenConfirm: - connProof = src.chainProvider.ConnectionProof - assembleMessage = dst.chainProvider.MsgConnectionOpenConfirm - default: - return nil, fmt.Errorf("unexepected connection message eventType for message assembly: %s", msg.eventType) - } - var proof provider.ConnectionProof - var err error - if connProof != nil { - proof, err = connProof(ctx, msg.info, src.latestBlock.Height) - if err != nil { - return nil, fmt.Errorf("error querying connection proof: %w", err) - } - } - return assembleMessage(msg.info, proof) -} - -func (pp *PathProcessor) assembleChannelMessage( - ctx context.Context, - msg channelIBCMessage, - src, dst *pathEndRuntime, -) (provider.RelayerMessage, error) { - var chanProof func(context.Context, provider.ChannelInfo, uint64) (provider.ChannelProof, error) - var assembleMessage func(provider.ChannelInfo, provider.ChannelProof) (provider.RelayerMessage, error) - switch msg.eventType { - case chantypes.EventTypeChannelOpenInit: - // don't need proof for this message - assembleMessage = dst.chainProvider.MsgChannelOpenInit - case chantypes.EventTypeChannelOpenTry: - chanProof = src.chainProvider.ChannelProof - assembleMessage = dst.chainProvider.MsgChannelOpenTry - case chantypes.EventTypeChannelOpenAck: - chanProof = src.chainProvider.ChannelProof - assembleMessage = dst.chainProvider.MsgChannelOpenAck - case chantypes.EventTypeChannelOpenConfirm: - chanProof = src.chainProvider.ChannelProof - assembleMessage = dst.chainProvider.MsgChannelOpenConfirm - case chantypes.EventTypeChannelCloseInit: - // don't need proof for this message - assembleMessage = dst.chainProvider.MsgChannelCloseInit - case chantypes.EventTypeChannelCloseConfirm: - chanProof = src.chainProvider.ChannelProof - assembleMessage = dst.chainProvider.MsgChannelCloseConfirm - default: - return nil, fmt.Errorf("unexepected channel message eventType for message assembly: %s", msg.eventType) - } - var proof provider.ChannelProof - var err error - if chanProof != nil { - proof, err = chanProof(ctx, msg.info, src.latestBlock.Height) - if err != nil { - return nil, fmt.Errorf("error querying channel proof: %w", err) - } - } - return assembleMessage(msg.info, proof) -} - -func (pp *PathProcessor) assembleClientICQMessage( - ctx context.Context, - msg clientICQMessage, - src, dst *pathEndRuntime, -) (provider.RelayerMessage, error) { - ctx, cancel := context.WithTimeout(ctx, interchainQueryTimeout) - defer cancel() - - proof, err := src.chainProvider.QueryICQWithProof(ctx, msg.info.Type, msg.info.Request, src.latestBlock.Height-1) - if err != nil { - return nil, fmt.Errorf("error during interchain query: %w", err) - } - - return dst.chainProvider.MsgSubmitQueryResponse(msg.info.Chain, msg.info.QueryID, proof) -} - func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { pathEnd1ChannelSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) pathEnd1ChannelDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index a949a6aad..188e3d670 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -1,10 +1,12 @@ package processor import ( - "strconv" + "context" + "encoding/base64" + "fmt" "strings" - "sync" + conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap/zapcore" @@ -20,7 +22,18 @@ type pathEndMessages struct { } type ibcMessage interface { - ibcMessageIndicator() + // assemble executes the appropriate proof query function, + // then, if successful, assembles the message for the destination. + assemble(ctx context.Context, src, dst *pathEndRuntime) (provider.RelayerMessage, error) + + // tracker creates a message tracker for message status + tracker(assembled provider.RelayerMessage) messageToTrack + + // msgType returns a human readable string for logging describing the message type. + msgType() string + + // satisfies zapcore.ObjectMarshaler interface for use with zap.Object(). + MarshalLogObject(enc zapcore.ObjectEncoder) error } // packetIBCMessage holds a packet message's eventType and sequence along with it, @@ -30,12 +43,90 @@ type packetIBCMessage struct { eventType string } -func (packetIBCMessage) ibcMessageIndicator() {} +// assemble executes the appropriate proof query function, +// then, if successful, assembles the packet message for the destination. +func (msg packetIBCMessage) assemble( + ctx context.Context, + src, dst *pathEndRuntime, +) (provider.RelayerMessage, error) { + var packetProof func(context.Context, provider.PacketInfo, uint64) (provider.PacketProof, error) + var assembleMessage func(provider.PacketInfo, provider.PacketProof) (provider.RelayerMessage, error) + switch msg.eventType { + case chantypes.EventTypeRecvPacket: + packetProof = src.chainProvider.PacketCommitment + assembleMessage = dst.chainProvider.MsgRecvPacket + case chantypes.EventTypeAcknowledgePacket: + packetProof = src.chainProvider.PacketAcknowledgement + assembleMessage = dst.chainProvider.MsgAcknowledgement + case chantypes.EventTypeTimeoutPacket: + if msg.info.ChannelOrder == chantypes.ORDERED.String() { + packetProof = src.chainProvider.NextSeqRecv + } else { + packetProof = src.chainProvider.PacketReceipt + } + + assembleMessage = dst.chainProvider.MsgTimeout + case chantypes.EventTypeTimeoutPacketOnClose: + if msg.info.ChannelOrder == chantypes.ORDERED.String() { + packetProof = src.chainProvider.NextSeqRecv + } else { + packetProof = src.chainProvider.PacketReceipt + } + + assembleMessage = dst.chainProvider.MsgTimeoutOnClose + default: + return nil, fmt.Errorf("unexepected packet message eventType for message assembly: %s", msg.eventType) + } + + ctx, cancel := context.WithTimeout(ctx, packetProofQueryTimeout) + defer cancel() + + var proof provider.PacketProof + var err error + proof, err = packetProof(ctx, msg.info, src.latestBlock.Height) + if err != nil { + return nil, fmt.Errorf("error querying packet proof: %w", err) + } + return assembleMessage(msg.info, proof) +} + +// tracker creates a message tracker for message status +func (msg packetIBCMessage) tracker(assembled provider.RelayerMessage) messageToTrack { + return packetMessageToTrack{ + msg: msg, + assembled: assembled, + } +} + +func (packetIBCMessage) msgType() string { + return "packet" +} + +// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface +// so that you can use zap.Object("messages", r) when logging. +// This is typically useful when logging details about a partially sent result. +func (msg packetIBCMessage) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("type", msg.eventType) + enc.AddString("src_port", msg.info.SourcePort) + enc.AddString("src_channel", msg.info.SourceChannel) + enc.AddString("dst_port", msg.info.DestPort) + enc.AddString("dst_channel", msg.info.DestChannel) + enc.AddUint64("sequence", msg.info.Sequence) + enc.AddString("timeout_height", fmt.Sprintf( + "%d-%d", + msg.info.TimeoutHeight.RevisionNumber, + msg.info.TimeoutHeight.RevisionHeight, + )) + enc.AddUint64("timeout_timestamp", msg.info.TimeoutTimestamp) + enc.AddString("data", base64.StdEncoding.EncodeToString(msg.info.Data)) + enc.AddString("ack", base64.StdEncoding.EncodeToString(msg.info.Ack)) + return nil +} // channelKey returns channel key for new message by this eventType // based on prior eventType. -func (p packetIBCMessage) channelKey() (ChannelKey, error) { - return PacketInfoChannelKey(p.eventType, p.info) +func (msg packetIBCMessage) channelKey() (ChannelKey, error) { + return PacketInfoChannelKey(msg.eventType, msg.info) } // channelIBCMessage holds a channel handshake message's eventType along with its details, @@ -45,7 +136,74 @@ type channelIBCMessage struct { info provider.ChannelInfo } -func (channelIBCMessage) ibcMessageIndicator() {} +// assemble executes the appropriate proof query function, +// then, if successful, assembles the message for the destination. +func (msg channelIBCMessage) assemble( + ctx context.Context, + src, dst *pathEndRuntime, +) (provider.RelayerMessage, error) { + var chanProof func(context.Context, provider.ChannelInfo, uint64) (provider.ChannelProof, error) + var assembleMessage func(provider.ChannelInfo, provider.ChannelProof) (provider.RelayerMessage, error) + switch msg.eventType { + case chantypes.EventTypeChannelOpenInit: + // don't need proof for this message + assembleMessage = dst.chainProvider.MsgChannelOpenInit + case chantypes.EventTypeChannelOpenTry: + chanProof = src.chainProvider.ChannelProof + assembleMessage = dst.chainProvider.MsgChannelOpenTry + case chantypes.EventTypeChannelOpenAck: + chanProof = src.chainProvider.ChannelProof + assembleMessage = dst.chainProvider.MsgChannelOpenAck + case chantypes.EventTypeChannelOpenConfirm: + chanProof = src.chainProvider.ChannelProof + assembleMessage = dst.chainProvider.MsgChannelOpenConfirm + case chantypes.EventTypeChannelCloseInit: + // don't need proof for this message + assembleMessage = dst.chainProvider.MsgChannelCloseInit + case chantypes.EventTypeChannelCloseConfirm: + chanProof = src.chainProvider.ChannelProof + assembleMessage = dst.chainProvider.MsgChannelCloseConfirm + default: + return nil, fmt.Errorf("unexepected channel message eventType for message assembly: %s", msg.eventType) + } + var proof provider.ChannelProof + var err error + if chanProof != nil { + proof, err = chanProof(ctx, msg.info, src.latestBlock.Height) + if err != nil { + return nil, fmt.Errorf("error querying channel proof: %w", err) + } + } + return assembleMessage(msg.info, proof) +} + +// tracker creates a message tracker for message status +func (msg channelIBCMessage) tracker(assembled provider.RelayerMessage) messageToTrack { + return channelMessageToTrack{ + msg: msg, + assembled: assembled, + } +} + +func (channelIBCMessage) msgType() string { + return "channel handshake" +} + +// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface +// so that you can use zap.Object("messages", r) when logging. +// This is typically useful when logging details about a partially sent result. +func (msg channelIBCMessage) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("type", msg.eventType) + enc.AddString("port_id", msg.info.PortID) + enc.AddString("channel_id", msg.info.ChannelID) + enc.AddString("counterparty_port_id", msg.info.CounterpartyPortID) + enc.AddString("counterparty_channel_id", msg.info.CounterpartyChannelID) + enc.AddString("connection_id", msg.info.ConnID) + enc.AddString("counterparty_connection_id", msg.info.CounterpartyConnID) + enc.AddString("order", msg.info.Order.String()) + enc.AddString("version", msg.info.Version) + return nil +} // connectionIBCMessage holds a connection handshake message's eventType along with its details, // useful for sending messages around internal to the PathProcessor. @@ -54,7 +212,67 @@ type connectionIBCMessage struct { info provider.ConnectionInfo } -func (connectionIBCMessage) ibcMessageIndicator() {} +// assemble executes the appropriate proof query function, +// then, if successful, assembles the message for the destination. +func (msg connectionIBCMessage) assemble( + ctx context.Context, + src, dst *pathEndRuntime, +) (provider.RelayerMessage, error) { + var connProof func(context.Context, provider.ConnectionInfo, uint64) (provider.ConnectionProof, error) + var assembleMessage func(provider.ConnectionInfo, provider.ConnectionProof) (provider.RelayerMessage, error) + switch msg.eventType { + case conntypes.EventTypeConnectionOpenInit: + // don't need proof for this message + msg.info.CounterpartyCommitmentPrefix = src.chainProvider.CommitmentPrefix() + assembleMessage = dst.chainProvider.MsgConnectionOpenInit + case conntypes.EventTypeConnectionOpenTry: + msg.info.CounterpartyCommitmentPrefix = src.chainProvider.CommitmentPrefix() + connProof = src.chainProvider.ConnectionHandshakeProof + assembleMessage = dst.chainProvider.MsgConnectionOpenTry + case conntypes.EventTypeConnectionOpenAck: + connProof = src.chainProvider.ConnectionHandshakeProof + assembleMessage = dst.chainProvider.MsgConnectionOpenAck + case conntypes.EventTypeConnectionOpenConfirm: + connProof = src.chainProvider.ConnectionProof + assembleMessage = dst.chainProvider.MsgConnectionOpenConfirm + default: + return nil, fmt.Errorf("unexepected connection message eventType for message assembly: %s", msg.eventType) + } + var proof provider.ConnectionProof + var err error + if connProof != nil { + proof, err = connProof(ctx, msg.info, src.latestBlock.Height) + if err != nil { + return nil, fmt.Errorf("error querying connection proof: %w", err) + } + } + return assembleMessage(msg.info, proof) +} + +// tracker creates a message tracker for message status +func (msg connectionIBCMessage) tracker(assembled provider.RelayerMessage) messageToTrack { + return connectionMessageToTrack{ + msg: msg, + assembled: assembled, + } +} + +func (connectionIBCMessage) msgType() string { + return "connection handshake" +} + +// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface +// so that you can use zap.Object("messages", r) when logging. +// This is typically useful when logging details about a partially sent result. +func (msg connectionIBCMessage) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("type", msg.eventType) + enc.AddString("client_id", msg.info.ClientID) + enc.AddString("cntrprty_client_id", msg.info.CounterpartyClientID) + enc.AddString("conn_id", msg.info.ConnID) + enc.AddString("cntrprty_conn_id", msg.info.CounterpartyConnID) + enc.AddString("cntrprty_commitment_prefix", msg.info.CounterpartyCommitmentPrefix.String()) + return nil +} const ( ClientICQTypeRequest ClientICQType = "query_request" @@ -67,7 +285,44 @@ type clientICQMessage struct { info provider.ClientICQInfo } -func (clientICQMessage) ibcMessageIndicator() {} +// assemble executes the query against the source chain, +// then, if successful, assembles the response message for the destination. +func (msg clientICQMessage) assemble( + ctx context.Context, + src, dst *pathEndRuntime, +) (provider.RelayerMessage, error) { + ctx, cancel := context.WithTimeout(ctx, interchainQueryTimeout) + defer cancel() + + proof, err := src.chainProvider.QueryICQWithProof(ctx, msg.info.Type, msg.info.Request, src.latestBlock.Height-1) + if err != nil { + return nil, fmt.Errorf("error during interchain query: %w", err) + } + + return dst.chainProvider.MsgSubmitQueryResponse(msg.info.Chain, msg.info.QueryID, proof) +} + +// tracker creates a message tracker for message status +func (msg clientICQMessage) tracker(assembled provider.RelayerMessage) messageToTrack { + return clientICQMessageToTrack{ + msg: msg, + assembled: assembled, + } +} + +func (clientICQMessage) msgType() string { + return "client ICQ" +} + +// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface +// so that you can use zap.Object("messages", r) when logging. +// This is typically useful when logging details about a partially sent result. +func (msg clientICQMessage) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("type", msg.info.Type) + enc.AddString("query_id", string(msg.info.QueryID)) + enc.AddString("request", string(msg.info.Request)) + return nil +} // processingMessage tracks the state of a IBC message currently being processed. type processingMessage struct { @@ -204,76 +459,83 @@ func channelInfoChannelKey(c provider.ChannelInfo) ChannelKey { } } -// outgoingMessages is a slice of relayer messages that can be -// appended to concurrently. -type outgoingMessages struct { - mu sync.Mutex - msgs []provider.RelayerMessage - pktMsgs []packetMessageToTrack - connMsgs []connectionMessageToTrack - chanMsgs []channelMessageToTrack - clientICQMsgs []clientICQMessageToTrack -} +type messageToTrack interface { + // assembledMsg returns the assembled message ready to send. + assembledMsg() provider.RelayerMessage -// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface -// so that you can use zap.Object("messages", r) when logging. -// This is typically useful when logging details about a partially sent result. -func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { - for i, m := range om.pktMsgs { - pfx := "pkt_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"event_type", m.msg.eventType) - enc.AddString(pfx+"src_chan", m.msg.info.SourceChannel) - enc.AddString(pfx+"src_port", m.msg.info.SourcePort) - enc.AddString(pfx+"dst_chan", m.msg.info.DestChannel) - enc.AddString(pfx+"dst_port", m.msg.info.DestPort) - enc.AddString(pfx+"data", string(m.msg.info.Data)) - } - for i, m := range om.connMsgs { - pfx := "conn_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"event_type", m.msg.eventType) - enc.AddString(pfx+"client_id", m.msg.info.ClientID) - enc.AddString(pfx+"conn_id", m.msg.info.ConnID) - enc.AddString(pfx+"cntrprty_client_id", m.msg.info.CounterpartyClientID) - enc.AddString(pfx+"cntrprty_conn_id", m.msg.info.CounterpartyConnID) - } - for i, m := range om.chanMsgs { - pfx := "chan_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"event_type", m.msg.eventType) - enc.AddString(pfx+"chan_id", m.msg.info.ChannelID) - enc.AddString(pfx+"port_id", m.msg.info.PortID) - enc.AddString(pfx+"cntrprty_chan_id", m.msg.info.CounterpartyChannelID) - enc.AddString(pfx+"cntrprty_port_id", m.msg.info.CounterpartyPortID) - } - return nil -} + // msgType returns a human readable string for logging describing the message type. + msgType() string -// Append acquires a lock on om's mutex and then appends msg. -// When there are no more possible concurrent calls to Append, -// it is safe to directly access om.msgs. -func (om *outgoingMessages) Append(msg provider.RelayerMessage) { - om.mu.Lock() - defer om.mu.Unlock() - om.msgs = append(om.msgs, msg) + // satisfies zapcore.ObjectMarshaler interface for use with zap.Object(). + MarshalLogObject(enc zapcore.ObjectEncoder) error } type packetMessageToTrack struct { msg packetIBCMessage - assembled bool + assembled provider.RelayerMessage +} + +func (t packetMessageToTrack) assembledMsg() provider.RelayerMessage { + return t.assembled +} + +func (t packetMessageToTrack) msgType() string { + return t.msg.msgType() +} + +func (t packetMessageToTrack) MarshalLogObject(enc zapcore.ObjectEncoder) error { + return t.msg.MarshalLogObject(enc) } type connectionMessageToTrack struct { msg connectionIBCMessage - assembled bool + assembled provider.RelayerMessage +} + +func (t connectionMessageToTrack) assembledMsg() provider.RelayerMessage { + return t.assembled +} + +func (t connectionMessageToTrack) msgType() string { + return t.msg.msgType() +} + +func (t connectionMessageToTrack) MarshalLogObject(enc zapcore.ObjectEncoder) error { + return t.msg.MarshalLogObject(enc) } type channelMessageToTrack struct { msg channelIBCMessage - assembled bool + assembled provider.RelayerMessage +} + +func (t channelMessageToTrack) assembledMsg() provider.RelayerMessage { + return t.assembled +} + +func (t channelMessageToTrack) msgType() string { + return t.msg.msgType() +} + +func (t channelMessageToTrack) MarshalLogObject(enc zapcore.ObjectEncoder) error { + return t.msg.MarshalLogObject(enc) } type clientICQMessageToTrack struct { msg clientICQMessage - assembled bool + assembled provider.RelayerMessage +} + +func (t clientICQMessageToTrack) assembledMsg() provider.RelayerMessage { + return t.assembled +} + +func (t clientICQMessageToTrack) msgType() string { + return t.msg.msgType() +} + +func (t clientICQMessageToTrack) MarshalLogObject(enc zapcore.ObjectEncoder) error { + return t.msg.MarshalLogObject(enc) } // orderFromString parses a string into a channel order byte. diff --git a/relayer/processor/types_test.go b/relayer/processor/types_test.go index 28dc697c7..39224dc74 100644 --- a/relayer/processor/types_test.go +++ b/relayer/processor/types_test.go @@ -12,6 +12,7 @@ type mockIBCHeader struct{} func (h mockIBCHeader) Height() uint64 { return 0 } func (h mockIBCHeader) ConsensusState() ibcexported.ConsensusState { return nil } +func (h mockIBCHeader) NextValidatorsHash() []byte { return nil } func TestIBCHeaderCachePrune(t *testing.T) { cache := make(processor.IBCHeaderCache) diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 3abf0235d..a49d19a90 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -18,9 +18,17 @@ import ( "go.uber.org/zap/zapcore" ) +type BroadcastMode string + +const ( + BroadcastModeSingle BroadcastMode = "single" + BroadcastModeBatch BroadcastMode = "batch" +) + type ProviderConfig interface { NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (ChainProvider, error) Validate() error + BroadcastMode() BroadcastMode } type RelayerMessage interface { @@ -29,11 +37,12 @@ type RelayerMessage interface { } type RelayerTxResponse struct { - Height int64 - TxHash string - Code uint32 - Data string - Events []RelayerEvent + Height int64 + TxHash string + Codespace string + Code uint32 + Data string + Events []RelayerEvent } type RelayerEvent struct { @@ -49,7 +58,7 @@ type LatestBlock struct { type IBCHeader interface { Height() uint64 ConsensusState() ibcexported.ConsensusState - // require conversion implementation for third party chains + NextValidatorsHash() []byte } // ClientState holds the current state of a client from a single chain's perspective @@ -193,6 +202,7 @@ func (es loggableEvents) MarshalLogArray(enc zapcore.ArrayEncoder) error { func (r RelayerTxResponse) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddInt64("height", r.Height) enc.AddString("tx_hash", r.TxHash) + enc.AddString("codespace", r.Codespace) enc.AddUint32("code", r.Code) enc.AddString("data", r.Data) enc.AddArray("events", loggableEvents(r.Events)) @@ -367,6 +377,14 @@ type ChainProvider interface { SendMessage(ctx context.Context, msg RelayerMessage, memo string) (*RelayerTxResponse, bool, error) SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error) + SendMessagesToMempool( + ctx context.Context, + msgs []RelayerMessage, + memo string, + + asyncCtx context.Context, + asyncCallback func(*RelayerTxResponse, error), + ) error ChainName() string ChainId() string