Skip to content

Commit

Permalink
isolate fix to mercury; more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Dec 10, 2024
1 parent 54938d4 commit ee24472
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix non-idempotent loopp registry.Register
89 changes: 60 additions & 29 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mercury

import (
"context"
"encoding/json"
"fmt"
"os/exec"
Expand Down Expand Up @@ -79,14 +80,13 @@ func NewServices(
return nil, errors.New("expected job to have a non-nil PipelineSpec")
}

var err error
var pluginConfig config.PluginConfig
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
} else {
err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -101,8 +101,8 @@ func NewServices(
// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if err = services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", err)
if cerr := services.MultiCloser(srvs).Close(); cerr != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
Expand All @@ -112,6 +112,7 @@ func NewServices(
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
fErr error
)
fCfg := factoryCfg{
orm: orm,
Expand All @@ -127,31 +128,31 @@ func NewServices(
}
switch feedID.Version() {
case 1:
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv1factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 2:
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv2factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 3:
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv3factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 4:
factory, factoryServices, err = newv4factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv4factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
default:
Expand Down Expand Up @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loop mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand All @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
loopID := mercuryLggr.Name()
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
ID: loopID,
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil
}

// loopUnregisterCloser is a helper to unregister a loop
// as a service
// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern
// perhaps it can be implemented in the delegate on job delete.
type loopUnregisterCloser struct {
r plugins.RegistrarConfig
id string
}

func (l *loopUnregisterCloser) Close() error {
l.r.UnregisterLOOP(l.id)
return nil
}

func (l *loopUnregisterCloser) Start(ctx context.Context) error {
return nil
}

func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser {
return &loopUnregisterCloser{
r: r,
id: id,
}
return cmdFn, opts, mercuryLggr, nil
}
Loading

0 comments on commit ee24472

Please sign in to comment.