Skip to content

Commit

Permalink
Mercury transmitter can use different codecs for native & link price …
Browse files Browse the repository at this point in the history
…reports (#14631)

* Mercury transmitter can use different codecs for native & link price reports

* Fix lint.

* Don't shadow err var

---------

Co-authored-by: Ivaylo Novakov <[email protected]>
  • Loading branch information
martin-cll and ro-tex authored Oct 3, 2024
1 parent 815b2c8 commit ad9398a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .changeset/cuddly-colts-speak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix for Mercury transmitter decoding reports of a different codec version #internal
44 changes: 30 additions & 14 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
if relayConfig.FeedID == nil {
return nil, pkgerrors.New("FeedID must be specified")
}
feedID := mercuryutils.FeedID(*relayConfig.FeedID)

if relayConfig.ChainID.String() != r.chain.ID().String() {
return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String())
Expand Down Expand Up @@ -430,20 +429,37 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
reportCodecV3 := reportcodecv3.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV3"))
reportCodecV4 := reportcodecv4.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV4"))

var transmitterCodec mercury.TransmitterReportDecoder
switch feedID.Version() {
case 1:
transmitterCodec = reportCodecV1
case 2:
transmitterCodec = reportCodecV2
case 3:
transmitterCodec = reportCodecV3
case 4:
transmitterCodec = reportCodecV4
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
getCodecForFeed := func(feedID mercuryutils.FeedID) (mercury.TransmitterReportDecoder, error) {
var transmitterCodec mercury.TransmitterReportDecoder
switch feedID.Version() {
case 1:
transmitterCodec = reportCodecV1
case 2:
transmitterCodec = reportCodecV2
case 3:
transmitterCodec = reportCodecV3
case 4:
transmitterCodec = reportCodecV4
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
return transmitterCodec, nil
}

benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
benchmarkPriceCodec, benchmarkPriceErr := getCodecForFeed(feedID)
if benchmarkPriceErr != nil {
return nil, benchmarkPriceErr
}
return benchmarkPriceCodec.BenchmarkPriceFromReport(report)
}
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, r.triggerCapability)

transmitterCodec, err := getCodecForFeed(mercuryutils.FeedID(*relayConfig.FeedID))
if err != nil {
return nil, err
}

transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability)

return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil
}
Expand Down
12 changes: 8 additions & 4 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type TransmitterReportDecoder interface {
ObservationTimestampFromReport(report ocrtypes.Report) (uint32, error)
}

type BenchmarkPriceDecoder func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error)

var _ Transmitter = (*mercuryTransmitter)(nil)

type TransmitterConfig interface {
Expand All @@ -116,8 +118,9 @@ type mercuryTransmitter struct {
orm ORM
servers map[string]*server

codec TransmitterReportDecoder
triggerCapability *triggers.MercuryTriggerService
codec TransmitterReportDecoder
benchmarkPriceDecoder BenchmarkPriceDecoder
triggerCapability *triggers.MercuryTriggerService

feedID mercuryutils.FeedID
jobID int32
Expand Down Expand Up @@ -301,7 +304,7 @@ func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, p
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, benchmarkPriceDecoder BenchmarkPriceDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
sugared := logger.Sugared(lggr)
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
Expand All @@ -317,6 +320,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
orm,
servers,
codec,
benchmarkPriceDecoder,
triggerCapability,
feedID,
jobID,
Expand Down Expand Up @@ -513,7 +517,7 @@ func (mt *mercuryTransmitter) LatestPrice(ctx context.Context, feedID [32]byte)
if !is {
return nil, fmt.Errorf("expected report to be []byte, but it was %T", m["report"])
}
return mt.codec.BenchmarkPriceFromReport(report)
return mt.benchmarkPriceDecoder(feedID, report)
}

// LatestTimestamp will return -1, nil if the feed is missing
Expand Down
48 changes: 30 additions & 18 deletions core/services/relay/evm/mercury/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
Expand All @@ -43,6 +43,9 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -51,7 +54,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV1Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -65,7 +68,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV2Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -79,7 +82,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV3Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -94,7 +97,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
triggerService := triggers.NewMercuryTriggerService(0, lggr)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, triggerService)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, triggerService)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -111,7 +114,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
clients[sURL2] = c
clients[sURL3] = c

mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
mt.servers[sURL2].q.Init([]*Transmission{})
Expand All @@ -136,6 +139,9 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}

orm := NewORM(db)
clients := map[string]wsrpc.Client{}
Expand All @@ -153,7 +159,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -169,7 +175,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -183,7 +189,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.LatestTimestamp(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand Down Expand Up @@ -213,7 +219,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
return out, nil
},
}
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand Down Expand Up @@ -243,6 +249,9 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
var jobID int32

codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -260,7 +269,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)

t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) {
codec.val = originalPrice
Expand Down Expand Up @@ -291,7 +300,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.NoError(t, err)

Expand All @@ -305,7 +314,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand All @@ -319,6 +328,9 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -335,7 +347,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -351,7 +363,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -364,7 +376,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand All @@ -382,7 +394,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x")
Expand Down

0 comments on commit ad9398a

Please sign in to comment.