From 23d1e5c864b35d133cad6a0ef06970a2b1e1b03f Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Wed, 24 May 2023 15:48:39 -0600 Subject: [PATCH] Neutron launch fixes and optimizations (#1185) * pipe max msgs through path processor * only apply max msgs to packet msgs * multiple msgs simultaneously on ordered chans * flush should be more frequent if it fails or does not complete * fix legacy * handle feedback --- cmd/flags.go | 11 +- cmd/root.go | 16 +- cmd/start.go | 30 +- cmd/tx.go | 4 +- go.mod | 2 +- .../chains/cosmos/cosmos_chain_processor.go | 7 + relayer/chains/cosmos/query.go | 2 - .../chains/mock/mock_chain_processor_test.go | 4 +- relayer/channel.go | 3 + relayer/connection.go | 1 + relayer/processor/path_processor.go | 28 +- relayer/processor/path_processor_internal.go | 347 +++++++++++++----- relayer/processor/types.go | 30 ++ relayer/processor/types_internal.go | 5 + relayer/strategies.go | 9 +- 15 files changed, 336 insertions(+), 163 deletions(-) diff --git a/cmd/flags.go b/cmd/flags.go index 427f12d52..82bab138a 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -225,11 +225,12 @@ func urlFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command { } func strategyFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command { - cmd.Flags().StringP(flagMaxTxSize, "s", "2", "strategy of path to generate of the messages in a relay transaction") - cmd.Flags().StringP(flagMaxMsgLength, "l", "5", "maximum number of messages in a relay transaction") - if err := v.BindPFlag(flagMaxTxSize, cmd.Flags().Lookup(flagMaxTxSize)); err != nil { - panic(err) - } + cmd.Flags().Uint64P( + flagMaxMsgLength, + "l", + relayer.DefaultMaxMsgLength, + "maximum number of messages per transaction", + ) if err := v.BindPFlag(flagMaxMsgLength, cmd.Flags().Lookup(flagMaxMsgLength)); err != nil { panic(err) } diff --git a/cmd/root.go b/cmd/root.go index d76360e01..eb8367409 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -34,13 +34,9 @@ import ( "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "golang.org/x/term" ) -const ( - MB = 1024 * 1024 // in bytes - appName = "rly" -) +const appName = "rly" var defaultHome = filepath.Join(os.Getenv("HOME"), ".relayer") @@ -185,18 +181,10 @@ func newRootLogger(format string, debug bool) (*zap.Logger, error) { switch format { case "json": enc = zapcore.NewJSONEncoder(config) - case "console": + case "auto", "console": enc = zapcore.NewConsoleEncoder(config) case "logfmt": enc = zaplogfmt.NewEncoder(config) - case "auto": - if term.IsTerminal(int(os.Stderr.Fd())) { - // When a user runs relayer in the foreground, use easier to read output. - enc = zapcore.NewConsoleEncoder(config) - } else { - // Otherwise, use consistent logfmt format for simplistic machine processing. - enc = zaplogfmt.NewEncoder(config) - } default: return nil, fmt.Errorf("unrecognized log format %q", format) } diff --git a/cmd/start.go b/cmd/start.go index fa69d2a80..c2cdf9406 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "net" - "strconv" "strings" "github.com/cosmos/relayer/v2/internal/relaydebug" @@ -88,7 +87,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), return err } - maxTxSize, maxMsgLength, err := GetStartOptions(cmd) + maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength) if err != nil { return err } @@ -149,7 +148,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), a.log, chains, paths, - maxTxSize, maxMsgLength, + maxMsgLength, a.config.memo(cmd), clientUpdateThresholdTime, flushInterval, @@ -182,28 +181,3 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), cmd = memoFlag(a.viper, cmd) return cmd } - -// GetStartOptions sets strategy specific fields. -func GetStartOptions(cmd *cobra.Command) (uint64, uint64, error) { - maxTxSize, err := cmd.Flags().GetString(flagMaxTxSize) - if err != nil { - return 0, 0, err - } - - txSize, err := strconv.ParseUint(maxTxSize, 10, 64) - if err != nil { - return 0, 0, err - } - - maxMsgLength, err := cmd.Flags().GetString(flagMaxMsgLength) - if err != nil { - return txSize * MB, 0, err - } - - msgLen, err := strconv.ParseUint(maxMsgLength, 10, 64) - if err != nil { - return txSize * MB, 0, err - } - - return txSize * MB, msgLen, nil -} diff --git a/cmd/tx.go b/cmd/tx.go index 6e9244527..e1b5e124a 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -781,7 +781,7 @@ $ %s tx flush demo-path channel-0`, return err } - maxTxSize, maxMsgLength, err := GetStartOptions(cmd) + maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength) if err != nil { return err } @@ -802,7 +802,7 @@ $ %s tx flush demo-path channel-0`, a.log, chains, paths, - maxTxSize, maxMsgLength, + maxMsgLength, a.config.memo(cmd), 0, 0, diff --git a/go.mod b/go.mod index fd1091a78..16f05763e 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( go.uber.org/zap v1.24.0 golang.org/x/mod v0.8.0 golang.org/x/sync v0.1.0 - golang.org/x/term v0.7.0 golang.org/x/text v0.9.0 google.golang.org/grpc v1.54.0 gopkg.in/yaml.v2 v2.4.0 @@ -171,6 +170,7 @@ require ( golang.org/x/net v0.9.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sys v0.7.0 // indirect + golang.org/x/term v0.7.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.110.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index e5de5f2bc..0c350f1f4 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -398,6 +398,13 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu break } + ccp.log.Debug( + "Queried block", + zap.Int64("height", i), + zap.Int64("latest", persistence.latestHeight), + zap.Int64("delta", persistence.latestHeight-i), + ) + persistence.retriesAtLatestQueriedBlock = 0 latestHeader = ibcHeader.(provider.TendermintIBCHeader) diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index f8025456e..6e438727f 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -897,7 +897,6 @@ func (cc *CosmosProvider) QueryUnreceivedPackets(ctx context.Context, height uin func sendPacketQuery(channelID string, portID string, seq uint64) string { x := []string{ fmt.Sprintf("%s.packet_src_channel='%s'", spTag, channelID), - fmt.Sprintf("%s.packet_src_port='%s'", spTag, portID), fmt.Sprintf("%s.packet_sequence='%d'", spTag, seq), } return strings.Join(x, " AND ") @@ -906,7 +905,6 @@ func sendPacketQuery(channelID string, portID string, seq uint64) string { func writeAcknowledgementQuery(channelID string, portID string, seq uint64) string { x := []string{ fmt.Sprintf("%s.packet_dst_channel='%s'", waTag, channelID), - fmt.Sprintf("%s.packet_dst_port='%s'", waTag, portID), fmt.Sprintf("%s.packet_sequence='%d'", waTag, seq), } return strings.Join(x, " AND ") diff --git a/relayer/chains/mock/mock_chain_processor_test.go b/relayer/chains/mock/mock_chain_processor_test.go index 505fa87db..718826917 100644 --- a/relayer/chains/mock/mock_chain_processor_test.go +++ b/relayer/chains/mock/mock_chain_processor_test.go @@ -9,6 +9,7 @@ import ( clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/mock" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/prometheus/client_golang/prometheus/testutil" @@ -61,7 +62,8 @@ func TestMockChainAndPathProcessors(t *testing.T) { clientUpdateThresholdTime := 6 * time.Hour flushInterval := 6 * time.Hour - pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "", clientUpdateThresholdTime, flushInterval) + pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "", + clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength) eventProcessor := processor.NewEventProcessor(). WithChainProcessors( diff --git a/relayer/channel.go b/relayer/channel.go index 5274d405d..2f3f32b64 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -59,6 +59,7 @@ func (c *Chain) CreateOpenChannels( memo, DefaultClientUpdateThreshold, DefaultFlushInterval, + DefaultMaxMsgLength, ) c.log.Info("Starting event processor for channel handshake", @@ -131,6 +132,7 @@ func (c *Chain) CloseChannel( memo, DefaultClientUpdateThreshold, DefaultFlushInterval, + DefaultMaxMsgLength, )). WithInitialBlockHistory(0). WithMessageLifecycle(&processor.FlushLifecycle{}). @@ -168,6 +170,7 @@ func (c *Chain) CloseChannel( memo, DefaultClientUpdateThreshold, DefaultFlushInterval, + DefaultMaxMsgLength, )). WithInitialBlockHistory(0). WithMessageLifecycle(&processor.ChannelCloseLifecycle{ diff --git a/relayer/connection.go b/relayer/connection.go index 1f818eff7..df784d504 100644 --- a/relayer/connection.go +++ b/relayer/connection.go @@ -40,6 +40,7 @@ func (c *Chain) CreateOpenConnections( memo, DefaultClientUpdateThreshold, DefaultFlushInterval, + DefaultMaxMsgLength, ) var connectionSrc, connectionDst string diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 976037ba1..d6df17eea 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -27,6 +27,9 @@ const ( // Amount of time to wait for interchain queries. interchainQueryTimeout = 60 * time.Second + // Amount of time between flushes if the previous flush failed. + flushFailureRetry = 15 * time.Second + // If message assembly fails from either proof query failure on the source // or assembling the message for the destination, how many blocks should pass // before retrying. @@ -63,7 +66,7 @@ type PathProcessor struct { messageLifecycle MessageLifecycle initialFlushComplete bool - flushTicker *time.Ticker + flushTimer *time.Timer flushInterval time.Duration // Signals to retry. @@ -71,6 +74,8 @@ type PathProcessor struct { sentInitialMsg bool + maxMsgs uint64 + metrics *PrometheusMetrics } @@ -94,6 +99,7 @@ func NewPathProcessor( memo string, clientUpdateThresholdTime time.Duration, flushInterval time.Duration, + maxMsgs uint64, ) *PathProcessor { pp := &PathProcessor{ log: log, @@ -104,6 +110,7 @@ func NewPathProcessor( clientUpdateThresholdTime: clientUpdateThresholdTime, flushInterval: flushInterval, metrics: metrics, + maxMsgs: maxMsgs, } if flushInterval == 0 { pp.disablePeriodicFlush() @@ -264,6 +271,16 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC } } +func (pp *PathProcessor) handleFlush(ctx context.Context) { + flushTimer := pp.flushInterval + if err := pp.flush(ctx); err != nil { + pp.log.Warn("Flush not complete", zap.Error(err)) + flushTimer = flushFailureRetry + } + pp.flushTimer.Stop() + pp.flushTimer = time.NewTimer(flushTimer) +} + // processAvailableSignals will block if signals are not yet available, otherwise it will process one of the available signals. // It returns whether or not the pathProcessor should quit. func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel func()) bool { @@ -287,9 +304,9 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun case <-pp.retryProcess: // No new data to merge in, just retry handling. - case <-pp.flushTicker.C: + case <-pp.flushTimer.C: // Periodic flush to clear out any old packets - pp.flush(ctx) + pp.handleFlush(ctx) } return false } @@ -298,8 +315,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { var retryTimer *time.Timer - pp.flushTicker = time.NewTicker(pp.flushInterval) - defer pp.flushTicker.Stop() + pp.flushTimer = time.NewTimer(time.Hour) for { // block until we have any signals to process @@ -319,7 +335,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { } if pp.shouldFlush() && !pp.initialFlushComplete { - pp.flush(ctx) + pp.handleFlush(ctx) pp.initialFlushComplete = true } else if pp.shouldTerminateForFlushComplete() { cancel() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 040808103..79d1740c5 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "sort" "sync" @@ -18,47 +19,95 @@ import ( // i.e. a MsgConnectionOpenInit or a MsgChannelOpenInit should be broadcasted to start // the handshake if this key exists in the relevant cache. const ( - preInitKey = "pre_init" - preCloseKey = "pre_close" + preInitKey = "pre_init" + preCloseKey = "pre_close" + maxPacketsPerFlush = 10 ) // getMessagesToSend returns only the lowest sequence message (if it should be sent) for ordered channels, // otherwise returns all which should be sent. func (pp *PathProcessor) getMessagesToSend( + ctx context.Context, msgs []packetIBCMessage, src, dst *pathEndRuntime, ) (srcMsgs []packetIBCMessage, dstMsgs []packetIBCMessage) { if len(msgs) == 0 { return } + if msgs[0].info.ChannelOrder == chantypes.ORDERED.String() { - // for packet messages on ordered channels, only handle the lowest sequence number now. - sort.SliceStable(msgs, func(i, j int) bool { - return msgs[i].info.Sequence < msgs[j].info.Sequence - }) - firstMsg := msgs[0] - switch firstMsg.eventType { - case chantypes.EventTypeRecvPacket: - if dst.shouldSendPacketMessage(firstMsg, src) { - dstMsgs = append(dstMsgs, firstMsg) + eventMessages := make(map[string][]packetIBCMessage) + + for _, m := range msgs { + eventMessages[m.eventType] = append(eventMessages[m.eventType], m) + } + + for e, m := range eventMessages { + m := m + sort.SliceStable(m, func(i, j int) bool { + return m[i].info.Sequence < m[j].info.Sequence + }) + + if e == chantypes.EventTypeRecvPacket { + dstChan, dstPort := m[0].info.DestChannel, m[0].info.DestPort + res, err := dst.chainProvider.QueryNextSeqRecv(ctx, 0, dstChan, dstPort) + if err != nil { + dst.log.Error("Failed to query next sequence recv", + zap.String("channel_id", dstChan), + zap.String("port_id", dstPort), + zap.Error(err), + ) + return + } + + if m[0].info.Sequence != res.NextSequenceReceive { + dst.log.Error("Unexpected next sequence recv", + zap.String("channel_id", m[0].info.DestChannel), + zap.String("port_id", m[0].info.DestChannel), + zap.Uint64("expected", res.NextSequenceReceive), + zap.Uint64("actual", m[0].info.Sequence), + ) + return + } } - default: - if src.shouldSendPacketMessage(firstMsg, dst) { - srcMsgs = append(srcMsgs, firstMsg) + + for i, msg := range m { + // only handle consecutive sequences on ordered channels + if i > 0 && msg.info.Sequence-1 != m[i-1].info.Sequence { + dst.log.Error("Packets are not consecutive", + zap.String("channel_id", m[0].info.DestChannel), + zap.String("port_id", m[0].info.DestChannel), + zap.Uint64("seq", msg.info.Sequence), + zap.Uint64("prior_seq", m[i-1].info.Sequence), + ) + break + } + + switch e { + case chantypes.EventTypeRecvPacket: + if uint64(len(dstMsgs)) <= pp.maxMsgs && dst.shouldSendPacketMessage(msg, src) { + dstMsgs = append(dstMsgs, msg) + } + default: + if uint64(len(srcMsgs)) <= pp.maxMsgs && src.shouldSendPacketMessage(msg, dst) { + srcMsgs = append(srcMsgs, msg) + } + } } } + return srcMsgs, dstMsgs } - // for unordered channels, can handle multiple simultaneous packets. + // for unordered channels, don't need to worry about sequence ordering. for _, msg := range msgs { switch msg.eventType { case chantypes.EventTypeRecvPacket: - if dst.shouldSendPacketMessage(msg, src) { + if uint64(len(dstMsgs)) <= pp.maxMsgs && dst.shouldSendPacketMessage(msg, src) { dstMsgs = append(dstMsgs, msg) } default: - if src.shouldSendPacketMessage(msg, dst) { + if uint64(len(srcMsgs)) <= pp.maxMsgs && src.shouldSendPacketMessage(msg, dst) { srcMsgs = append(srcMsgs, msg) } } @@ -211,7 +260,12 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages( msgs = append(msgs, msgTransfer) } - res.SrcMessages, res.DstMessages = pp.getMessagesToSend(msgs, pathEndPacketFlowMessages.Src, pathEndPacketFlowMessages.Dst) + res.SrcMessages, res.DstMessages = pp.getMessagesToSend( + ctx, + msgs, + pathEndPacketFlowMessages.Src, + pathEndPacketFlowMessages.Dst, + ) return res } @@ -1073,7 +1127,8 @@ func queryPacketCommitments( } } -func queuePendingRecvAndAcks( +// queuePendingRecvAndAcks returns whether flush can be considered complete (none skipped). +func (pp *PathProcessor) queuePendingRecvAndAcks( ctx context.Context, src, dst *pathEndRuntime, k ChannelKey, @@ -1082,124 +1137,187 @@ func queuePendingRecvAndAcks( dstCache ChannelPacketMessagesCache, srcMu sync.Locker, dstMu sync.Locker, -) func() error { - return func() error { - if len(seqs) == 0 { - src.log.Debug("Nothing to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) - return nil - } +) (bool, error) { - dstChan, dstPort := k.CounterpartyChannelID, k.CounterpartyPortID + if len(seqs) == 0 { + src.log.Debug("Nothing to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) + return true, nil + } - unrecv, err := dst.chainProvider.QueryUnreceivedPackets(ctx, dst.latestBlock.Height, dstChan, dstPort, seqs) + dstChan, dstPort := k.CounterpartyChannelID, k.CounterpartyPortID + + unrecv, err := dst.chainProvider.QueryUnreceivedPackets(ctx, dst.latestBlock.Height, dstChan, dstPort, seqs) + if err != nil { + return false, err + } + + dstHeight := int64(dst.latestBlock.Height) + + if len(unrecv) > 0 { + channel, err := dst.chainProvider.QueryChannel(ctx, dstHeight, dstChan, dstPort) if err != nil { - return err + return false, err } - dstHeight := int64(dst.latestBlock.Height) - - if len(unrecv) > 0 { - channel, err := dst.chainProvider.QueryChannel(ctx, dstHeight, dstChan, dstPort) + if channel.Channel.Ordering == chantypes.ORDERED { + nextSeqRecv, err := dst.chainProvider.QueryNextSeqRecv(ctx, dstHeight, dstChan, dstPort) if err != nil { - return err + return false, err } - if channel.Channel.Ordering == chantypes.ORDERED { - nextSeqRecv, err := dst.chainProvider.QueryNextSeqRecv(ctx, dstHeight, dstChan, dstPort) - if err != nil { - return err + var newUnrecv []uint64 + + for _, seq := range unrecv { + if seq >= nextSeqRecv.NextSequenceReceive { + newUnrecv = append(newUnrecv, seq) } + } - var newUnrecv []uint64 + unrecv = newUnrecv - for _, seq := range unrecv { - if seq >= nextSeqRecv.NextSequenceReceive { - newUnrecv = append(newUnrecv, seq) - break - } - } + sort.SliceStable(unrecv, func(i, j int) bool { + return unrecv[i] < unrecv[j] + }) + } + } - unrecv = newUnrecv - } + var eg errgroup.Group + + skipped := false + + for i, seq := range unrecv { + srcMu.Lock() + if srcCache.IsCached(chantypes.EventTypeSendPacket, k, seq) { + continue // already cached } + srcMu.Unlock() - if len(unrecv) > 0 { - src.log.Debug("Will flush MsgRecvPacket", - zap.String("channel", k.ChannelID), - zap.String("port", k.PortID), - zap.Uint64s("sequences", unrecv), - ) - } else { - src.log.Debug("No MsgRecvPacket to flush", - zap.String("channel", k.ChannelID), - zap.String("port", k.PortID), - ) + if i >= maxPacketsPerFlush { + skipped = true + break } - for _, seq := range unrecv { + src.log.Debug("Querying send packet", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.Uint64("sequence", seq), + ) + + seq := seq + + eg.Go(func() error { sendPacket, err := src.chainProvider.QuerySendPacket(ctx, k.ChannelID, k.PortID, seq) if err != nil { return err } srcMu.Lock() - if _, ok := srcCache[k]; !ok { - srcCache[k] = make(PacketMessagesCache) - } - if _, ok := srcCache[k][chantypes.EventTypeSendPacket]; !ok { - srcCache[k][chantypes.EventTypeSendPacket] = make(PacketSequenceCache) - } - srcCache[k][chantypes.EventTypeSendPacket][seq] = sendPacket + srcCache.Cache(chantypes.EventTypeSendPacket, k, seq, sendPacket) srcMu.Unlock() - } - var unacked []uint64 + src.log.Debug("Cached send packet", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.String("ctrpty_channel", k.CounterpartyChannelID), + zap.String("ctrpty_port", k.CounterpartyPortID), + zap.Uint64("sequence", seq), + ) - SeqLoop: - for _, seq := range seqs { - for _, unrecvSeq := range unrecv { - if seq == unrecvSeq { - continue SeqLoop - } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return false, err + } + + if len(unrecv) > 0 { + src.log.Debug("Will flush MsgRecvPacket", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.Uint64s("sequences", unrecv), + ) + } else { + src.log.Debug("No MsgRecvPacket to flush", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + ) + } + + var unacked []uint64 + +SeqLoop: + for _, seq := range seqs { + for _, unrecvSeq := range unrecv { + if seq == unrecvSeq { + continue SeqLoop } - // does not exist in unrecv, so this is an ack that must be written - unacked = append(unacked, seq) } + // does not exist in unrecv, so this is an ack that must be written + unacked = append(unacked, seq) + } - if len(unacked) > 0 { - src.log.Debug("Will flush MsgAcknowledgement", zap.Object("channel", k), zap.Uint64s("sequences", unacked)) - } else { - src.log.Debug("No MsgAcknowledgement to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) + for i, seq := range unacked { + dstMu.Lock() + ck := k.Counterparty() + if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) && + dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) { + continue // already cached } + dstMu.Unlock() + + if i >= maxPacketsPerFlush { + skipped = true + break + } + + seq := seq - for _, seq := range unacked { + dst.log.Debug("Querying recv packet", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + zap.Uint64("sequence", seq), + ) + + eg.Go(func() error { recvPacket, err := dst.chainProvider.QueryRecvPacket(ctx, k.CounterpartyChannelID, k.CounterpartyPortID, seq) if err != nil { return err } - dstMu.Lock() - ck := k.Counterparty() - if _, ok := dstCache[ck]; !ok { - dstCache[ck] = make(PacketMessagesCache) - } - if _, ok := dstCache[ck][chantypes.EventTypeRecvPacket]; !ok { - dstCache[ck][chantypes.EventTypeRecvPacket] = make(PacketSequenceCache) - } - if _, ok := dstCache[ck][chantypes.EventTypeWriteAck]; !ok { - dstCache[ck][chantypes.EventTypeWriteAck] = make(PacketSequenceCache) - } - dstCache[ck][chantypes.EventTypeRecvPacket][seq] = recvPacket - dstCache[ck][chantypes.EventTypeWriteAck][seq] = recvPacket + dstMu.Lock() + dstCache.Cache(chantypes.EventTypeRecvPacket, ck, seq, recvPacket) + dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, recvPacket) dstMu.Unlock() - } - return nil + + return nil + }) } + + if err := eg.Wait(); err != nil { + return false, err + } + + if len(unacked) > 0 { + dst.log.Debug( + "Will flush MsgAcknowledgement", + zap.Object("channel", k), + zap.Uint64s("sequences", unacked), + ) + } else { + dst.log.Debug( + "No MsgAcknowledgement to flush", + zap.String("channel", k.CounterpartyChannelID), + zap.String("port", k.CounterpartyPortID), + ) + } + + return !skipped, nil } // flush runs queries to relay any pending messages which may have been // in blocks before the height that the chain processors started querying. -func (pp *PathProcessor) flush(ctx context.Context) { +func (pp *PathProcessor) flush(ctx context.Context) error { var ( commitments1 = make(map[ChannelKey][]uint64) commitments2 = make(map[ChannelKey][]uint64) @@ -1240,27 +1358,56 @@ func (pp *PathProcessor) flush(ctx context.Context) { } if err := eg.Wait(); err != nil { - pp.log.Error("Failed to query packet commitments", zap.Error(err)) + return fmt.Errorf("failed to query packet commitments: %w", err) } // From remaining packet commitments, determine if: // 1. Packet commitment is on source, but MsgRecvPacket has not yet been relayed to destination // 2. Packet commitment is on source, and MsgRecvPacket has been relayed to destination, but MsgAcknowledgement has not been written to source to clear the packet commitment. // Based on above conditions, enqueue MsgRecvPacket and MsgAcknowledgement messages + skipped := false for k, seqs := range commitments1 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, &pathEnd1CacheMu, &pathEnd2CacheMu)) + k := k + seqs := seqs + eg.Go(func() error { + done, err := pp.queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, &pathEnd1CacheMu, &pathEnd2CacheMu) + if err != nil { + return err + } + if !done { + skipped = true + } + return nil + }) } for k, seqs := range commitments2 { - eg.Go(queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, &pathEnd2CacheMu, &pathEnd1CacheMu)) + k := k + seqs := seqs + eg.Go(func() error { + done, err := pp.queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, &pathEnd2CacheMu, &pathEnd1CacheMu) + if err != nil { + return err + } + if !done { + skipped = true + } + return nil + }) } if err := eg.Wait(); err != nil { - pp.log.Error("Failed to enqueue pending messages for flush", zap.Error(err)) + return fmt.Errorf("failed to enqueue pending messages for flush: %w", err) } pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) + + if skipped { + return fmt.Errorf("flush was successful, but more packet sequences are still pending") + } + + return nil } // shouldTerminateForFlushComplete will determine if the relayer should exit diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 3f4059b7b..d01e73205 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -322,6 +322,36 @@ func (c PacketMessagesCache) DeleteMessages(toDelete ...map[string][]uint64) { } } +// IsCached returns true if a sequence for a channel key and event type is already cached. +func (c ChannelPacketMessagesCache) IsCached(eventType string, k ChannelKey, sequence uint64) bool { + if _, ok := c[k]; !ok { + return false + } + if _, ok := c[k][eventType]; !ok { + return false + } + if _, ok := c[k][eventType][sequence]; !ok { + return false + } + return true +} + +// Cache stores packet info safely, generating intermediate maps along the way if necessary. +func (c ChannelPacketMessagesCache) Cache( + eventType string, + k ChannelKey, + sequence uint64, + packetInfo provider.PacketInfo, +) { + if _, ok := c[k]; !ok { + c[k] = make(PacketMessagesCache) + } + if _, ok := c[k][eventType]; !ok { + c[k][eventType] = make(PacketSequenceCache) + } + c[k][eventType][sequence] = packetInfo +} + // Merge merges another ChannelPacketMessagesCache into this one. func (c ChannelPacketMessagesCache) Merge(other ChannelPacketMessagesCache) { for channelKey, messageCache := range other { diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index d526ed70a..5bb403429 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -12,6 +12,11 @@ import ( "go.uber.org/zap/zapcore" ) +var _ zapcore.ObjectMarshaler = packetIBCMessage{} +var _ zapcore.ObjectMarshaler = channelIBCMessage{} +var _ zapcore.ObjectMarshaler = connectionIBCMessage{} +var _ zapcore.ObjectMarshaler = clientICQMessage{} + // pathEndMessages holds the different IBC messages that // will attempt to be sent to the pathEnd. type pathEndMessages struct { diff --git a/relayer/strategies.go b/relayer/strategies.go index 511f09905..047478938 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -27,6 +27,8 @@ const ( ProcessorLegacy = "legacy" DefaultClientUpdateThreshold = 0 * time.Millisecond DefaultFlushInterval = 5 * time.Minute + DefaultMaxMsgLength = 5 + TwoMB = 2 * 1024 * 1024 ) // StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors. @@ -35,7 +37,7 @@ func StartRelayer( log *zap.Logger, chains map[string]*Chain, paths []NamedPath, - maxTxSize, maxMsgLength uint64, + maxMsgLength uint64, memo string, clientUpdateThresholdTime time.Duration, flushInterval time.Duration, @@ -80,7 +82,6 @@ func StartRelayer( chainProcessors, ePaths, initialBlockHistory, - maxTxSize, maxMsgLength, memo, messageLifecycle, @@ -98,7 +99,7 @@ func StartRelayer( src, dst := chains[p.Src.ChainID], chains[p.Dst.ChainID] src.PathEnd = p.Src dst.PathEnd = p.Dst - go relayerStartLegacy(ctx, log, src, dst, p.Filter, maxTxSize, maxMsgLength, memo, errorChan) + go relayerStartLegacy(ctx, log, src, dst, p.Filter, TwoMB, maxMsgLength, memo, errorChan) return errorChan default: panic(fmt.Errorf("unexpected processor type: %s, supports one of: [%s, %s]", processorType, ProcessorEvents, ProcessorLegacy)) @@ -132,7 +133,6 @@ func relayerStartEventProcessor( chainProcessors []processor.ChainProcessor, paths []path, initialBlockHistory uint64, - maxTxSize, maxMsgLength uint64, memo string, messageLifecycle processor.MessageLifecycle, @@ -155,6 +155,7 @@ func relayerStartEventProcessor( memo, clientUpdateThresholdTime, flushInterval, + maxMsgLength, )) }