Skip to content

Commit

Permalink
Neutron launch fixes and optimizations (#1185)
Browse files Browse the repository at this point in the history
* pipe max msgs through path processor

* only apply max msgs to packet msgs

* multiple msgs simultaneously on ordered chans

* flush should be more frequent if it fails or does not complete

* fix legacy

* handle feedback
  • Loading branch information
agouin authored May 24, 2023
1 parent debdee7 commit 23d1e5c
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 163 deletions.
11 changes: 6 additions & 5 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ func urlFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
}

func strategyFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().StringP(flagMaxTxSize, "s", "2", "strategy of path to generate of the messages in a relay transaction")
cmd.Flags().StringP(flagMaxMsgLength, "l", "5", "maximum number of messages in a relay transaction")
if err := v.BindPFlag(flagMaxTxSize, cmd.Flags().Lookup(flagMaxTxSize)); err != nil {
panic(err)
}
cmd.Flags().Uint64P(
flagMaxMsgLength,
"l",
relayer.DefaultMaxMsgLength,
"maximum number of messages per transaction",
)
if err := v.BindPFlag(flagMaxMsgLength, cmd.Flags().Lookup(flagMaxMsgLength)); err != nil {
panic(err)
}
Expand Down
16 changes: 2 additions & 14 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@ import (
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/term"
)

const (
MB = 1024 * 1024 // in bytes
appName = "rly"
)
const appName = "rly"

var defaultHome = filepath.Join(os.Getenv("HOME"), ".relayer")

Expand Down Expand Up @@ -185,18 +181,10 @@ func newRootLogger(format string, debug bool) (*zap.Logger, error) {
switch format {
case "json":
enc = zapcore.NewJSONEncoder(config)
case "console":
case "auto", "console":
enc = zapcore.NewConsoleEncoder(config)
case "logfmt":
enc = zaplogfmt.NewEncoder(config)
case "auto":
if term.IsTerminal(int(os.Stderr.Fd())) {
// When a user runs relayer in the foreground, use easier to read output.
enc = zapcore.NewConsoleEncoder(config)
} else {
// Otherwise, use consistent logfmt format for simplistic machine processing.
enc = zaplogfmt.NewEncoder(config)
}
default:
return nil, fmt.Errorf("unrecognized log format %q", format)
}
Expand Down
30 changes: 2 additions & 28 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/cosmos/relayer/v2/internal/relaydebug"
Expand Down Expand Up @@ -88,7 +87,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
return err
}

maxTxSize, maxMsgLength, err := GetStartOptions(cmd)
maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +148,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
a.log,
chains,
paths,
maxTxSize, maxMsgLength,
maxMsgLength,
a.config.memo(cmd),
clientUpdateThresholdTime,
flushInterval,
Expand Down Expand Up @@ -182,28 +181,3 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
cmd = memoFlag(a.viper, cmd)
return cmd
}

// GetStartOptions sets strategy specific fields.
func GetStartOptions(cmd *cobra.Command) (uint64, uint64, error) {
maxTxSize, err := cmd.Flags().GetString(flagMaxTxSize)
if err != nil {
return 0, 0, err
}

txSize, err := strconv.ParseUint(maxTxSize, 10, 64)
if err != nil {
return 0, 0, err
}

maxMsgLength, err := cmd.Flags().GetString(flagMaxMsgLength)
if err != nil {
return txSize * MB, 0, err
}

msgLen, err := strconv.ParseUint(maxMsgLength, 10, 64)
if err != nil {
return txSize * MB, 0, err
}

return txSize * MB, msgLen, nil
}
4 changes: 2 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ $ %s tx flush demo-path channel-0`,
return err
}

maxTxSize, maxMsgLength, err := GetStartOptions(cmd)
maxMsgLength, err := cmd.Flags().GetUint64(flagMaxMsgLength)
if err != nil {
return err
}
Expand All @@ -802,7 +802,7 @@ $ %s tx flush demo-path channel-0`,
a.log,
chains,
paths,
maxTxSize, maxMsgLength,
maxMsgLength,
a.config.memo(cmd),
0,
0,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
go.uber.org/zap v1.24.0
golang.org/x/mod v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.7.0
golang.org/x/text v0.9.0
google.golang.org/grpc v1.54.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -171,6 +170,7 @@ require (
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.110.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
7 changes: 7 additions & 0 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
break
}

ccp.log.Debug(
"Queried block",
zap.Int64("height", i),
zap.Int64("latest", persistence.latestHeight),
zap.Int64("delta", persistence.latestHeight-i),
)

persistence.retriesAtLatestQueriedBlock = 0

latestHeader = ibcHeader.(provider.TendermintIBCHeader)
Expand Down
2 changes: 0 additions & 2 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ func (cc *CosmosProvider) QueryUnreceivedPackets(ctx context.Context, height uin
func sendPacketQuery(channelID string, portID string, seq uint64) string {
x := []string{
fmt.Sprintf("%s.packet_src_channel='%s'", spTag, channelID),
fmt.Sprintf("%s.packet_src_port='%s'", spTag, portID),
fmt.Sprintf("%s.packet_sequence='%d'", spTag, seq),
}
return strings.Join(x, " AND ")
Expand All @@ -906,7 +905,6 @@ func sendPacketQuery(channelID string, portID string, seq uint64) string {
func writeAcknowledgementQuery(channelID string, portID string, seq uint64) string {
x := []string{
fmt.Sprintf("%s.packet_dst_channel='%s'", waTag, channelID),
fmt.Sprintf("%s.packet_dst_port='%s'", waTag, portID),
fmt.Sprintf("%s.packet_sequence='%d'", waTag, seq),
}
return strings.Join(x, " AND ")
Expand Down
4 changes: 3 additions & 1 deletion relayer/chains/mock/mock_chain_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types"
chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/chains/mock"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -61,7 +62,8 @@ func TestMockChainAndPathProcessors(t *testing.T) {
clientUpdateThresholdTime := 6 * time.Hour
flushInterval := 6 * time.Hour

pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "", clientUpdateThresholdTime, flushInterval)
pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "",
clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength)

eventProcessor := processor.NewEventProcessor().
WithChainProcessors(
Expand Down
3 changes: 3 additions & 0 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Chain) CreateOpenChannels(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)

c.log.Info("Starting event processor for channel handshake",
Expand Down Expand Up @@ -131,6 +132,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.FlushLifecycle{}).
Expand Down Expand Up @@ -168,6 +170,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.ChannelCloseLifecycle{
Expand Down
1 change: 1 addition & 0 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *Chain) CreateOpenConnections(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
)

var connectionSrc, connectionDst string
Expand Down
28 changes: 22 additions & 6 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
// Amount of time to wait for interchain queries.
interchainQueryTimeout = 60 * time.Second

// Amount of time between flushes if the previous flush failed.
flushFailureRetry = 15 * time.Second

// If message assembly fails from either proof query failure on the source
// or assembling the message for the destination, how many blocks should pass
// before retrying.
Expand Down Expand Up @@ -63,14 +66,16 @@ type PathProcessor struct {
messageLifecycle MessageLifecycle

initialFlushComplete bool
flushTicker *time.Ticker
flushTimer *time.Timer
flushInterval time.Duration

// Signals to retry.
retryProcess chan struct{}

sentInitialMsg bool

maxMsgs uint64

metrics *PrometheusMetrics
}

Expand All @@ -94,6 +99,7 @@ func NewPathProcessor(
memo string,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
maxMsgs uint64,
) *PathProcessor {
pp := &PathProcessor{
log: log,
Expand All @@ -104,6 +110,7 @@ func NewPathProcessor(
clientUpdateThresholdTime: clientUpdateThresholdTime,
flushInterval: flushInterval,
metrics: metrics,
maxMsgs: maxMsgs,
}
if flushInterval == 0 {
pp.disablePeriodicFlush()
Expand Down Expand Up @@ -264,6 +271,16 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC
}
}

func (pp *PathProcessor) handleFlush(ctx context.Context) {
flushTimer := pp.flushInterval
if err := pp.flush(ctx); err != nil {
pp.log.Warn("Flush not complete", zap.Error(err))
flushTimer = flushFailureRetry
}
pp.flushTimer.Stop()
pp.flushTimer = time.NewTimer(flushTimer)
}

// processAvailableSignals will block if signals are not yet available, otherwise it will process one of the available signals.
// It returns whether or not the pathProcessor should quit.
func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel func()) bool {
Expand All @@ -287,9 +304,9 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun

case <-pp.retryProcess:
// No new data to merge in, just retry handling.
case <-pp.flushTicker.C:
case <-pp.flushTimer.C:
// Periodic flush to clear out any old packets
pp.flush(ctx)
pp.handleFlush(ctx)
}
return false
}
Expand All @@ -298,8 +315,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
var retryTimer *time.Timer

pp.flushTicker = time.NewTicker(pp.flushInterval)
defer pp.flushTicker.Stop()
pp.flushTimer = time.NewTimer(time.Hour)

for {
// block until we have any signals to process
Expand All @@ -319,7 +335,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
}

if pp.shouldFlush() && !pp.initialFlushComplete {
pp.flush(ctx)
pp.handleFlush(ctx)
pp.initialFlushComplete = true
} else if pp.shouldTerminateForFlushComplete() {
cancel()
Expand Down
Loading

0 comments on commit 23d1e5c

Please sign in to comment.