Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase disablement of cache if LatestReportTTL=0 #11636

Merged
merged 5 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/services/relay/evm/mercury/wsrpc/cache/cache_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newCacheSet(lggr logger.Logger, cfg Config) *cacheSet {

func (cs *cacheSet) Start(context.Context) error {
return cs.StartOnce("CacheSet", func() error {
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg)
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg, "cachingEnabled", cs.cfg.LatestReportTTL > 0)
return nil
})
}
Expand All @@ -65,6 +65,10 @@ func (cs *cacheSet) Close() error {
}

func (cs *cacheSet) Get(ctx context.Context, client Client) (f Fetcher, err error) {
if cs.cfg.LatestReportTTL == 0 {
// caching disabled
return client, nil
}
ok := cs.IfStarted(func() {
f, err = cs.get(ctx, client)
})
Expand Down
45 changes: 25 additions & 20 deletions core/services/relay/evm/mercury/wsrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// MaxConsecutiveTransmitFailures controls how many consecutive requests are
// MaxConsecutiveRequestFailures controls how many consecutive requests are
// allowed to time out before we reset the connection
const MaxConsecutiveTransmitFailures = 5
const MaxConsecutiveRequestFailures = 10

var (
timeoutCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -55,7 +55,7 @@ var (
)
connectionResetCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_connection_reset_count",
Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive transmit failures)", MaxConsecutiveTransmitFailures),
Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive request failures)", MaxConsecutiveRequestFailures),
},
[]string{"serverURL"},
)
Expand Down Expand Up @@ -256,13 +256,26 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
return nil, errors.Wrap(err, "Transmit call failed")
}
resp, err = w.rawClient.Transmit(ctx, req)
w.handleTimeout(err)
if err != nil {
w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp)
incRequestStatusMetric(statusFailed)
} else {
w.logger.Tracew("Transmit call succeeded", "resp", resp)
incRequestStatusMetric(statusSuccess)
setRequestLatencyMetric(float64(time.Since(start).Milliseconds()))
}
return
}

func (w *client) handleTimeout(err error) {
if errors.Is(err, context.DeadlineExceeded) {
w.timeoutCountMetric.Inc()
cnt := w.consecutiveTimeoutCnt.Add(1)
if cnt == MaxConsecutiveTransmitFailures {
if cnt == MaxConsecutiveRequestFailures {
w.logger.Errorf("Timed out on %d consecutive transmits, resetting transport", cnt)
// NOTE: If we get 5+ request timeouts in a row, close and re-open
// the websocket connection.
// NOTE: If we get at least MaxConsecutiveRequestFailures request
// timeouts in a row, close and re-open the websocket connection.
//
// This *shouldn't* be necessary in theory (ideally, wsrpc would
// handle it for us) but it acts as a "belts and braces" approach
Expand All @@ -271,11 +284,11 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
select {
case w.chResetTransport <- struct{}{}:
default:
// This can happen if we had 5 consecutive timeouts, already
// sent a reset signal, then the connection started working
// again (resetting the count) then we got 5 additional
// failures before the runloop was able to close the bad
// connection.
// This can happen if we had MaxConsecutiveRequestFailures
// consecutive timeouts, already sent a reset signal, then the
// connection started working again (resetting the count) then
// we got MaxConsecutiveRequestFailures additional failures
// before the runloop was able to close the bad connection.
//
// It should be safe to just ignore in this case.
//
Expand All @@ -286,15 +299,6 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
} else {
w.consecutiveTimeoutCnt.Store(0)
}
if err != nil {
w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp)
incRequestStatusMetric(statusFailed)
} else {
w.logger.Tracew("Transmit call succeeded", "resp", resp)
incRequestStatusMetric(statusSuccess)
setRequestLatencyMetric(float64(time.Since(start).Milliseconds()))
}
return
}

func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
Expand All @@ -306,6 +310,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest)
var cached bool
if w.cache == nil {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
} else {
cached = true
resp, err = w.cache.LatestReport(ctx, req)
Expand Down
Loading