Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes, logging improvements for LLO (and update chainlink-data-streams) #15169

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-dots-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config var Mercury.Transmitter.TransmitConcurrency #added
6 changes: 6 additions & 0 deletions .changeset/shiny-owls-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": patch
---

Logging improvements for LLO
#internal
6 changes: 3 additions & 3 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
}

evmFactoryCfg := chainlink.EVMFactoryConfig{
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds},
MercuryConfig: cfg.Mercury(),
}
// evm always enabled for backward compatibility
// TODO BCF-2510 this needs to change in order to clear the path for EVM extraction
Expand Down
4 changes: 4 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ TransmitQueueMaxSize = 10_000 # Default
# when sending a message to the mercury server, before aborting and considering
# the transmission to be failed.
TransmitTimeout = "5s" # Default
# TransmitConcurrency is the max number of concurrent transmits to each server.
#
# Only has effect with LLO jobs.
TransmitConcurrency = 100 # Default

# Telemetry holds OTEL settings.
# This data includes open telemetry metrics, traces, & logs.
Expand Down
1 change: 1 addition & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type MercuryTLS interface {
type MercuryTransmitter interface {
TransmitQueueMaxSize() uint32
TransmitTimeout() commonconfig.Duration
TransmitConcurrency() uint32
}

type Mercury interface {
Expand Down
4 changes: 4 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ func (m *MercuryTLS) ValidateConfig() (err error) {
type MercuryTransmitter struct {
TransmitQueueMaxSize *uint32
TransmitTimeout *commonconfig.Duration
TransmitConcurrency *uint32
}

func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
Expand All @@ -1339,6 +1340,9 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
if v := f.TransmitTimeout; v != nil {
m.TransmitTimeout = v
}
if v := f.TransmitConcurrency; v != nil {
m.TransmitConcurrency = v
}
}

type Mercury struct {
Expand Down
4 changes: 2 additions & 2 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
MailMon: mailMon,
DS: ds,
},
CSAETHKeystore: keyStore,
MercuryTransmitter: cfg.Mercury().Transmitter(),
CSAETHKeystore: keyStore,
MercuryConfig: cfg.Mercury(),
}

if cfg.EVMEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.29 // indirect
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec // indirect
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f // indirect
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e // indirect
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 // indirect
github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect
github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1413,8 +1413,8 @@ github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef06
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e/go.mod h1:iK3BNHKCLgSgkOyiu3iE7sfZ20Qnuk7xwjV/yO/6gnQ=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57 h1:1BMTG66HnCIz+KMBWGvyzELNM6VHGwv2WKFhN7H49Sg=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241114154055-8d29ea018b57/go.mod h1:QPiorgpbLv4+Jn4YO6xxU4ftTu4T3QN8HwX3ImP59DE=
github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c=
github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 h1:0ewLMbAz3rZrovdRUCgd028yOXX8KigB4FndAUdI2kM=
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration {
return *m.c.TransmitTimeout
}

func (m *mercuryTransmitterConfig) TransmitConcurrency() uint32 {
return *m.c.TransmitConcurrency
}

type mercuryConfig struct {
c toml.Mercury
s toml.MercurySecrets
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ func TestConfig_Marshal(t *testing.T) {
Transmitter: toml.MercuryTransmitter{
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
TransmitConcurrency: ptr(uint32(456)),
},
VerboseLogging: ptr(true),
}
Expand Down Expand Up @@ -1348,6 +1349,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456
`},
{"full", full, fullTOML},
{"multi-chain", multiChain, multiChainTOML},
Expand Down
4 changes: 2 additions & 2 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *RelayerFactory) NewDummy(config DummyFactoryConfig) (loop.Relayer, erro
type EVMFactoryConfig struct {
legacyevm.ChainOpts
evmrelay.CSAETHKeystore
coreconfig.MercuryTransmitter
MercuryConfig coreconfig.Mercury
}

func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LOOPRelayAdapter, error) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m
DS: ccOpts.DS,
CSAETHKeystore: config.CSAETHKeystore,
MercuryPool: r.MercuryPool,
TransmitterConfig: config.MercuryTransmitter,
MercuryConfig: config.MercuryConfig,
CapabilitiesRegistry: r.CapabilitiesRegistry,
HTTPClient: r.HTTPClient,
RetirementReportCache: r.RetirementReportCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ CertFile = '/path/to/cert.pem'
[Mercury.Transmitter]
TransmitQueueMaxSize = 123
TransmitTimeout = '3m54s'
TransmitConcurrency = 456

[Capabilities]
[Capabilities.Peering]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ CertFile = ''
[Mercury.Transmitter]
TransmitQueueMaxSize = 10000
TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.Peering]
Expand Down
5 changes: 3 additions & 2 deletions core/services/llo/codecs.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package llo

import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/services/llo/evm"
)

// NOTE: All supported codecs must be specified here
func NewReportCodecs() map[llotypes.ReportFormat]llo.ReportCodec {
func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec {
codecs := make(map[llotypes.ReportFormat]llo.ReportCodec)

codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.ReportCodecPremiumLegacy{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr)

return codecs
}
3 changes: 2 additions & 1 deletion core/services/llo/codecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"github.com/stretchr/testify/assert"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_NewReportCodecs(t *testing.T) {
c := NewReportCodecs()
c := NewReportCodecs(logger.TestLogger(t))

assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported")
assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported")
Expand Down
61 changes: 36 additions & 25 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package llo
import (
"context"
"fmt"
"slices"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -85,11 +87,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour

// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamValues))
var svmu sync.Mutex
var errs []ErrObservationFailed
var errmu sync.Mutex
now := time.Now()

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
Expand All @@ -100,6 +98,13 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

var wg sync.WaitGroup
wg.Add(len(streamValues))

var mu sync.Mutex
successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues))
var errs []ErrObservationFailed

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()
Expand All @@ -108,17 +113,17 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

stream, exists := d.registry.Get(streamID)
if !exists {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
mu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
mu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
// TODO: Consolidate/reduce telemetry. We should send all observation results in a single packet
// https://smartcontract-it.atlassian.net/browse/MERC-6290
Expand All @@ -129,44 +134,50 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = ExtractStreamValue(trrs)
if err != nil {
errmu.Lock()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
mu.Unlock()
return
}

d.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil)

mu.Lock()
defer mu.Unlock()

successfulStreamIDs = append(successfulStreamIDs, streamID)
if val != nil {
svmu.Lock()
defer svmu.Unlock()
streamValues[streamID] = val
}
}(streamID)
}

wg.Wait()
elapsed := time.Since(now)

// Failed observations are always logged at warn level
var failedStreamIDs []streams.StreamID
if len(errs) > 0 {
// Only log on errors or if VerboseLogging is turned on
if len(errs) > 0 || opts.VerboseLogging() {
slices.Sort(successfulStreamIDs)
sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errs))

failedStreamIDs := make([]streams.StreamID, len(errs))
errStrs := make([]string, len(errs))
for i, e := range errs {
errStrs[i] = e.String()
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

if opts.VerboseLogging() {
successes := make([]streams.StreamID, 0, len(streamValues))
for strmID := range streamValues {
successes = append(successes, strmID)
lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)

if opts.VerboseLogging() {
lggr = logger.With(lggr, "streamValues", streamValues)
}

if len(errs) == 0 && opts.VerboseLogging() {
lggr.Infow("Observation succeeded for all streams")
} else if len(errs) > 0 {
lggr.Warnw("Observation failed for streams")
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr)
}

return nil
Expand Down
13 changes: 10 additions & 3 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-data-streams/llo"
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"

corelogger "github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)
Expand Down Expand Up @@ -91,7 +92,13 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
if cfg.ShouldRetireCache == nil {
return nil, errors.New("ShouldRetireCache must not be nil")
}
reportCodecs := NewReportCodecs()
var codecLggr logger.Logger
if cfg.ReportingPluginConfig.VerboseLogging {
codecLggr = logger.Named(lggr, "ReportCodecs")
} else {
codecLggr = corelogger.NullLogger
}
reportCodecs := NewReportCodecs(codecLggr)

var t TelemeterService
if cfg.CaptureEATelemetry {
Expand Down Expand Up @@ -126,7 +133,7 @@ func (d *delegate) Start(ctx context.Context) error {
case 1:
lggr = logger.With(lggr, "instanceType", "Green")
}
ocrLogger := logger.NewOCRWrapper(lggr, d.cfg.TraceLogging, func(msg string) {
ocrLogger := logger.NewOCRWrapper(NewSuppressedLogger(lggr, d.cfg.ReportingPluginConfig.VerboseLogging), d.cfg.TraceLogging, func(msg string) {
// TODO: do we actually need to DB-persist errors?
// MERC-3524
})
Expand All @@ -144,7 +151,7 @@ func (d *delegate) Start(ctx context.Context) error {
OffchainKeyring: d.cfg.OffchainKeyring,
OnchainKeyring: d.cfg.OnchainKeyring,
ReportingPluginFactory: datastreamsllo.NewPluginFactory(
d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "LLOReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs,
d.cfg.ReportingPluginConfig, psrrc, d.src, d.cfg.RetirementReportCodec, d.cfg.ChannelDefinitionCache, d.ds, logger.Named(lggr, "ReportingPlugin"), llo.EVMOnchainConfigCodec{}, d.reportCodecs,
),
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": d.cfg.JobName.ValueOrZero()}, prometheus.DefaultRegisterer),
})
Expand Down
10 changes: 10 additions & 0 deletions core/services/llo/evm/fees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ func Test_Fees(t *testing.T) {
fee := CalculateFee(tokenPriceInUSD, BaseUSDFee)
assert.Equal(t, big.NewInt(0), fee)
})

t.Run("ridiculously high value rounds down fee to zero", func(t *testing.T) {
// 20dp
tokenPriceInUSD, err := decimal.NewFromString("12984833000000000000")
require.NoError(t, err)
BaseUSDFee, err = decimal.NewFromString("0.1")
require.NoError(t, err)
fee := CalculateFee(tokenPriceInUSD, BaseUSDFee)
assert.Equal(t, big.NewInt(0), fee)
})
}
Loading
Loading