Skip to content

Commit

Permalink
Pass explicit logger to relayer package functions
Browse files Browse the repository at this point in the history
Instead of relying on the logger associated with a chain, accept a
logger specifically for use in the package-level function.

By convention, ctx is always the first parameter, and then log.

Passing the logger separately also allows more flexibility in the
parameter types to these functions, which will simplify adding new tests
shortly.
  • Loading branch information
mark-rushakoff committed Apr 2, 2022
1 parent 76f5fb8 commit cc1630e
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 103 deletions.
60 changes: 35 additions & 25 deletions _test/relayer_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
tmprotoversion "github.com/tendermint/tendermint/proto/tendermint/version"
tmtypes "github.com/tendermint/tendermint/types"
tmversion "github.com/tendermint/tendermint/version"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -102,27 +103,29 @@ func chainTest(t *testing.T, tcs []testChain) {
srcAddr, err := src.ChainProvider.Address()
require.NoError(t, err)

require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channel))
require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channel))
log := zaptest.NewLogger(t)

require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channel))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channel))

// send a couple of transfers to the queue on dst
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channel))
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channel))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channel))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channel))

// Wait for message inclusion in both chains
require.NoError(t, dst.ChainProvider.WaitForNBlocks(ctx, 1))

// start the relayer process in it's own goroutine
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, src, dst, filter, 2*cmd.MB, 5)
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

// Wait for relay message inclusion in both chains
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 2))
require.NoError(t, dst.ChainProvider.WaitForNBlocks(ctx, 2))

// send those tokens from dst back to dst and src back to src
require.NoError(t, src.SendTransferMsg(ctx, dst, twoTestCoin, dstAddr, 0, 0, channel))
require.NoError(t, dst.SendTransferMsg(ctx, src, twoTestCoin, srcAddr, 0, 0, channel))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, twoTestCoin, dstAddr, 0, 0, channel))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, twoTestCoin, srcAddr, 0, 0, channel))

// wait for packet processing
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 6))
Expand Down Expand Up @@ -261,9 +264,10 @@ func TestGaiaMisbehaviourMonitoring(t *testing.T) {
channel := channels[0]
testChannelPair(ctx, t, src, dst, channel.ChannelId, channel.PortId)

log := zaptest.NewLogger(t)
// start the relayer process in it's own goroutine
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, src, dst, filter, 2*cmd.MB, 5)
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

// Wait for relay message inclusion in both chains
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 1))
Expand Down Expand Up @@ -411,39 +415,41 @@ func TestRelayAllChannelsOnConnection(t *testing.T) {
srcAddr, err := src.ChainProvider.Address()
require.NoError(t, err)

require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channelOne))
require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channelOne))
log := zaptest.NewLogger(t)

require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channelOne))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channelOne))

// send a couple of transfers to the queue on dst for first channel
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channelOne))
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channelOne))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channelOne))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channelOne))

// send a couple of transfers to the queue on src for second channel
require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channelTwo))
require.NoError(t, src.SendTransferMsg(ctx, dst, testCoin, dstAddr, 0, 0, channelTwo))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channelTwo))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, testCoin, dstAddr, 0, 0, channelTwo))

// send a couple of transfers to the queue on dst for second channel
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channelTwo))
require.NoError(t, dst.SendTransferMsg(ctx, src, testCoin, srcAddr, 0, 0, channelTwo))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channelTwo))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, testCoin, srcAddr, 0, 0, channelTwo))

// Wait for message inclusion in both chains
require.NoError(t, dst.ChainProvider.WaitForNBlocks(ctx, 1))

// start the relayer process in it's own goroutine
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, src, dst, filter, 2*cmd.MB, 5)
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

// Wait for relay message inclusion in both chains
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 1))
require.NoError(t, dst.ChainProvider.WaitForNBlocks(ctx, 1))

// send those tokens from dst back to dst and src back to src for first channel
require.NoError(t, src.SendTransferMsg(ctx, dst, twoTestCoin, dstAddr, 0, 0, channelOne))
require.NoError(t, dst.SendTransferMsg(ctx, src, twoTestCoin, srcAddr, 0, 0, channelOne))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, twoTestCoin, dstAddr, 0, 0, channelOne))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, twoTestCoin, srcAddr, 0, 0, channelOne))

// send those tokens from dst back to dst and src back to src for second channel
require.NoError(t, src.SendTransferMsg(ctx, dst, twoTestCoin, dstAddr, 0, 0, channelTwo))
require.NoError(t, dst.SendTransferMsg(ctx, src, twoTestCoin, srcAddr, 0, 0, channelTwo))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, twoTestCoin, dstAddr, 0, 0, channelTwo))
require.NoError(t, dst.SendTransferMsg(ctx, log, src, twoTestCoin, srcAddr, 0, 0, channelTwo))

// wait for packet processing
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 6))
Expand Down Expand Up @@ -596,15 +602,17 @@ func TestUnorderedChannelBlockHeightTimeout(t *testing.T) {
_, err = src.ChainProvider.Address()
require.NoError(t, err)

log := zaptest.NewLogger(t)

// send a packet that should timeout after 10 blocks have passed
require.NoError(t, src.SendTransferMsg(ctx, dst, twoTestCoin, dstAddr, uint64(10), 0, channel))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, twoTestCoin, dstAddr, uint64(10), 0, channel))

// wait for block height timeout offset to be reached
require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 11))

// start the relayer process in it's own goroutine
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, src, dst, filter, 2*cmd.MB, 5)
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

require.NoError(t, src.ChainProvider.WaitForNBlocks(ctx, 5))

Expand Down Expand Up @@ -692,15 +700,17 @@ func TestUnorderedChannelTimestampTimeout(t *testing.T) {
_, err = src.ChainProvider.Address()
require.NoError(t, err)

log := zaptest.NewLogger(t)

// send a packet that should timeout after 45 seconds
require.NoError(t, src.SendTransferMsg(ctx, dst, twoTestCoin, dstAddr, 0, time.Second*15, channel))
require.NoError(t, src.SendTransferMsg(ctx, log, dst, twoTestCoin, dstAddr, 0, time.Second*15, channel))

// wait for timestamp timeout offset to be reached
time.Sleep(time.Second * 20)

// start the relayer process in it's own goroutine
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, src, dst, filter, 2*cmd.MB, 5)
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

time.Sleep(time.Second * 5)

Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName)),
relaydebug.StartDebugServer(cmd.Context(), log, ln)
}

rlyErrCh := relayer.StartRelayer(cmd.Context(), c[src], c[dst], filter, maxTxSize, maxMsgLength)
rlyErrCh := relayer.StartRelayer(cmd.Context(), a.Log, c[src], c[dst], filter, maxTxSize, maxMsgLength)

// NOTE: This block of code is useful for ensuring that the clients tracking each chain do not expire
// when there are no packets flowing across the channels. It is currently a source of errors that have been
Expand Down
8 changes: 4 additions & 4 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ $ %s tx relay-pkt demo-path channel-1 1`,
return err
}

return relayer.RelayPacket(cmd.Context(), c[src], c[dst], sp, maxTxSize, maxMsgLength, uint64(seqNum), channel)
return relayer.RelayPacket(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, uint64(seqNum), channel)
},
}

Expand Down Expand Up @@ -774,7 +774,7 @@ $ %s tx relay-pkts demo-path channel-0`,
return err
}

if err = relayer.RelayPackets(cmd.Context(), c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
if err = relayer.RelayPackets(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
return err
}

Expand Down Expand Up @@ -824,7 +824,7 @@ $ %s tx relay-acks demo-path channel-0 -l 3 -s 6`,
return err
}

if err = relayer.RelayAcknowledgements(cmd.Context(), c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
if err = relayer.RelayAcknowledgements(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, channel); err != nil {
return err
}

Expand Down Expand Up @@ -1001,7 +1001,7 @@ $ %s tx raw send ibc-0 ibc-1 100000stake cosmos1skjwj5whet0lpe65qaq4rpq03hjxlwd9

}

return c[src].SendTransferMsg(cmd.Context(), c[dst], amount, dstAddr, toHeightOffset, toTimeOffset, srcChannel)
return c[src].SendTransferMsg(cmd.Context(), a.Log, c[dst], amount, dstAddr, toHeightOffset, toTimeOffset, srcChannel)
},
}

Expand Down
2 changes: 1 addition & 1 deletion relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (c *Chain) CloseChannel(ctx context.Context, dst *Chain, to time.Duration,
break
}

if closeSteps.Send(ctx, c, dst); closeSteps.Success() && closeSteps.Last {
if closeSteps.Send(ctx, c.log, c, dst); closeSteps.Success() && closeSteps.Last {
srch, dsth, err := QueryLatestHeights(ctx, c, dst)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion relayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *Chain) UpdateClients(ctx context.Context, dst *Chain) (err error) {

// Send msgs to both chains
if clients.Ready() {
if clients.Send(ctx, c, dst); clients.Success() {
if clients.Send(ctx, c.log, c, dst); clients.Success() {
c.log.Info(
"Clients updated",
zap.String("src_chain_id", c.ChainID()),
Expand Down
55 changes: 31 additions & 24 deletions relayer/log-chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,52 @@ import (
chantypes "github.com/cosmos/ibc-go/v3/modules/core/04-channel/types"
)

// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data
func (c *Chain) LogFailedTx(res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) {
if c.debug {
fields := make([]zap.Field, 1+len(msgs), 2+len(msgs))
fields[0] = zap.String("src_chain_id", c.ChainID())
for i, msg := range msgs {
cm, ok := msg.(cosmos.CosmosMessage)
if ok {
fields[i+1] = zap.Object(
fmt.Sprintf("msg-%d", i),
cm,
)
} else {
// TODO: choose another encoding instead of skipping?
fields[i+1] = zap.Skip()
}
func logFailedTx(log *zap.Logger, chainID string, res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) {
fields := make([]zap.Field, 1+len(msgs), 2+len(msgs))
fields[0] = zap.String("chain_id", chainID)
for i, msg := range msgs {
cm, ok := msg.(cosmos.CosmosMessage)
if ok {
fields[i+1] = zap.Object(
fmt.Sprintf("msg-%d", i),
cm,
)
} else {
// TODO: choose another encoding instead of skipping?
fields[i+1] = zap.Skip()
}
if err != nil {
fields = append(fields, zap.Error(err))
}
c.log.Info("Failed sending transaction", fields...)
}
if err != nil {
fields = append(fields, zap.Error(err))
}
log.Info("Failed sending transaction", fields...)

if res != nil && res.Code != 0 && res.Data != "" {
c.log.Info(
msgTypes := make([]string, len(msgs))
for i, msg := range msgs {
msgTypes[i] = msg.Type()
}

log.Info(
"Sent transaction that resulted in error",
zap.String("src_chain_id", c.ChainID()),
zap.String("chain_id", chainID),
zap.Int64("height", res.Height),
zap.String("msg_types", getMsgTypes(msgs)),
zap.Strings("msg_types", msgTypes),
zap.Uint32("error_code", res.Code),
zap.String("error_data", res.Data),
)
}

if res != nil {
c.log.Debug("Transaction response", zap.Object("resp", res))
log.Debug("Transaction response", zap.Object("resp", res))
}
}

// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data
func (c *Chain) LogFailedTx(res *provider.RelayerTxResponse, err error, msgs []provider.RelayerMessage) {
logFailedTx(c.log, c.ChainID(), res, err, msgs)
}

func (c *Chain) logPacketsRelayed(dst *Chain, num int, srcChannel *chantypes.IdentifiedChannel) {
c.log.Info(
"Relayed packets",
Expand Down
22 changes: 11 additions & 11 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (rs *RelaySequences) Empty() bool {
}

// RelayAcknowledgements creates transactions to relay acknowledgements from src to dst and from dst to src
func RelayAcknowledgements(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
func RelayAcknowledgements(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Expand Down Expand Up @@ -332,7 +332,7 @@ func RelayAcknowledgements(ctx context.Context, src, dst *Chain, sp *RelaySequen
}

if !msgs.Ready() {
src.log.Info(
log.Info(
"No acknowledgements to relay",
zap.String("src_chain_id", src.ChainID()),
zap.String("src_port_id", srcChannel.PortId),
Expand All @@ -352,7 +352,7 @@ func RelayAcknowledgements(ctx context.Context, src, dst *Chain, sp *RelaySequen
}

// send messages to their respective chains
if msgs.Send(ctx, src, dst); msgs.Success() {
if msgs.Send(ctx, log, src, dst); msgs.Success() {
if len(msgs.Dst) > 1 {
dst.logPacketsRelayed(src, len(msgs.Dst)-1, srcChannel)
}
Expand All @@ -367,7 +367,7 @@ func RelayAcknowledgements(ctx context.Context, src, dst *Chain, sp *RelaySequen
}

// RelayPackets creates transactions to relay packets from src to dst and from dst to src
func RelayPackets(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength uint64, srcChannel *chantypes.IdentifiedChannel) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Expand Down Expand Up @@ -401,7 +401,7 @@ func RelayPackets(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxT
}

if !msgs.Ready() {
src.log.Info(
log.Info(
"No packets to relay",
zap.String("src_chain_id", src.ChainID()),
zap.String("src_port_id", srcChannel.PortId),
Expand All @@ -427,7 +427,7 @@ func RelayPackets(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxT
}

// send messages to their respective chains
if msgs.Send(ctx, src, dst); msgs.Success() {
if msgs.Send(ctx, log, src, dst); msgs.Success() {
if len(msgs.Dst) > 1 {
dst.logPacketsRelayed(src, len(msgs.Dst)-1, srcChannel)
}
Expand Down Expand Up @@ -521,7 +521,7 @@ func PrependUpdateClientMsg(ctx context.Context, msgs *[]provider.RelayerMessage
}

// RelayPacket creates transactions to relay packets from src to dst and from dst to src
func RelayPacket(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength, seqNum uint64, srcChannel *chantypes.IdentifiedChannel) error {
func RelayPacket(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength, seqNum uint64, srcChannel *chantypes.IdentifiedChannel) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Expand All @@ -548,7 +548,7 @@ func RelayPacket(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTx
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(ctx, src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dstChanID, dstPortID, dst.ClientID(), srcChanID, srcPortID, src.ClientID())
if err != nil {
src.log.Warn(
log.Warn(
"Failed to relay packet from seq on src",
zap.String("src_chain_id", src.ChainID()),
zap.String("dst_chain_id", dst.ChainID()),
Expand Down Expand Up @@ -582,7 +582,7 @@ func RelayPacket(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTx
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = dst.ChainProvider.RelayPacketFromSequence(ctx, dst.ChainProvider, src.ChainProvider, uint64(dsth), uint64(srch), seq, srcChanID, srcPortID, src.ClientID(), dstChanID, dstPortID, dst.ClientID())
if err != nil {
dst.log.Warn("Failed to relay packet from seq on dst", zap.Error(err))
log.Warn("Failed to relay packet from seq on dst", zap.Error(err))
}
return nil
}, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
Expand All @@ -604,7 +604,7 @@ func RelayPacket(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTx
}

if !msgs.Ready() {
src.log.Info(
log.Info(
"No packets to relay",
zap.String("src_chain_id", src.ChainID()),
zap.String("src_port_id", srcPortID),
Expand Down Expand Up @@ -642,7 +642,7 @@ func RelayPacket(ctx context.Context, src, dst *Chain, sp *RelaySequences, maxTx
}

// send messages to their respective chains
if msgs.Send(ctx, src, dst); msgs.Success() {
if msgs.Send(ctx, log, src, dst); msgs.Success() {
if len(msgs.Dst) > 1 {
dst.logPacketsRelayed(src, len(msgs.Dst)-1, srcChannel)
}
Expand Down
Loading

0 comments on commit cc1630e

Please sign in to comment.