Skip to content

Commit

Permalink
Module id fail (#5261)
Browse files Browse the repository at this point in the history
* push changes

* Add remove whenever using the component module.

* Add additional context and remove dead file.

* Add long changelog comment

* fix linter

* Remove mutex check

* Add changes to support module id removal.

* Remove unneeded line

* Fix merge errors.

* Ensure module is checked on first run.

* rename and add comments

* Add manual removal back in and make test closer to actual usage.

* move changelog comment to correct location

* A different approach by keying off run instead of build.

* Add test for duplicate registration.

* Minor changes

* PR feedback

* add locks around the reads for tests, its a bit hacky.
  • Loading branch information
mattdurham authored Oct 16, 2023
1 parent 4d07963 commit 473938a
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Main (unreleased)

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
allowing invalid combination of configuration options. (@thampiotr)

- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to
permanently fail to load with `id already exists` error. (@mattdurham)

### Enhancements

Expand Down
6 changes: 5 additions & 1 deletion component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
)

Expand Down Expand Up @@ -71,7 +72,10 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin

// RunFlowController runs the flow controller that all module components start.
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
c.mod.Run(ctx)
err := c.mod.Run(ctx)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error running module", "id", c.opts.ID, "err", err)
}
}

// CurrentHealth contains the implementation details for CurrentHealth in a module component.
Expand Down
2 changes: 1 addition & 1 deletion component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Module interface {
//
// Run blocks until the provided context is canceled. The ID of a module as defined in
// ModuleController.NewModule will not be released until Run returns.
Run(context.Context)
Run(context.Context) error
}

// ExportFunc is used for onExport of the Module
Expand Down
67 changes: 67 additions & 0 deletions pkg/flow/componenttest/testfailmodule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package componenttest

import (
"context"
"fmt"

"github.com/grafana/agent/component"
mod "github.com/grafana/agent/component/module"
)

func init() {
component.Register(component.Registration{
Name: "test.fail.module",
Args: TestFailArguments{},
Exports: mod.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
m, err := mod.NewModuleComponent(opts)
if err != nil {
return nil, err
}
if args.(TestFailArguments).Fail {
return nil, fmt.Errorf("module told to fail")
}
err = m.LoadFlowSource(nil, args.(TestFailArguments).Content)
if err != nil {
return nil, err
}
return &TestFailModule{
mc: m,
content: args.(TestFailArguments).Content,
opts: opts,
fail: args.(TestFailArguments).Fail,
ch: make(chan error),
}, nil
},
})
}

type TestFailArguments struct {
Content string `river:"content,attr"`
Fail bool `river:"fail,attr,optional"`
}

type TestFailModule struct {
content string
opts component.Options
ch chan error
mc *mod.ModuleComponent
fail bool
}

func (t *TestFailModule) Run(ctx context.Context) error {
go t.mc.RunFlowController(ctx)
<-ctx.Done()
return nil
}

func (t *TestFailModule) UpdateContent(content string) error {
t.content = content
err := t.mc.LoadFlowSource(nil, t.content)
return err
}

func (t *TestFailModule) Update(_ component.Arguments) error {
return nil
}
15 changes: 10 additions & 5 deletions pkg/flow/flow_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

componentBuilt := util.NewWaitTrigger()
componentRunning := util.NewWaitTrigger()

var (
svc = &testservices.Fake{
Expand Down Expand Up @@ -218,9 +218,14 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
Name: "service_consumer",
Args: struct{}{},
NeedsServices: []string{"service"},
Build: func(_ component.Options, _ component.Arguments) (component.Component, error) {
componentBuilt.Trigger()
return &testcomponents.Fake{}, nil
Build: func(o component.Options, _ component.Arguments) (component.Component, error) {
return &testcomponents.Fake{
RunFunc: func(ctx context.Context) error {
componentRunning.Trigger()
<-ctx.Done()
return nil
},
}, nil
},
},
}
Expand All @@ -243,7 +248,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
require.NoError(t, ctrl.LoadSource(f, nil))
go ctrl.Run(ctx)

require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built")
require.NoError(t, componentRunning.Wait(5*time.Second), "Component should have been built")

consumers := ctrl.GetServiceConsumers("service")
require.Len(t, consumers, 2, "There should be a consumer for the module loader and the module's component")
Expand Down
16 changes: 15 additions & 1 deletion pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"testing"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/dag"
"github.com/grafana/agent/pkg/flow/logging"
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string, availableServices []string) controller.ModuleController {
return nil
return fakeModuleController{}
},
},
}
Expand Down Expand Up @@ -304,3 +305,16 @@ func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) {
}
require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match")
}

type fakeModuleController struct{}

func (f fakeModuleController) NewModule(id string, export component.ExportFunc) (component.Module, error) {
return nil, nil
}

func (f fakeModuleController) ModuleIDs() []string {
return nil
}

func (f fakeModuleController) ClearModuleIDs() {
}
3 changes: 1 addition & 2 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
}

return err
}

Expand Down Expand Up @@ -303,7 +302,7 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
}

// Run runs the managed component in the calling goroutine until ctx is
// canceled. Evaluate must have been called at least once without retuning an
// canceled. Evaluate must have been called at least once without returning an
// error before calling Run.
//
// Run will immediately return ErrUnevaluated if Evaluate has never been called
Expand Down
38 changes: 24 additions & 14 deletions pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/worker"
Expand Down Expand Up @@ -46,9 +47,6 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co
if id != "" {
fullPath = path.Join(fullPath, id)
}
if _, found := m.modules[fullPath]; found {
return nil, fmt.Errorf("id %s already exists", id)
}

mod := newModule(&moduleOptions{
ID: fullPath,
Expand All @@ -57,26 +55,33 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co
parent: m,
})

if err := m.o.ModuleRegistry.Register(fullPath, mod); err != nil {
return nil, err
}

m.modules[fullPath] = struct{}{}
return mod, nil
}

func (m *moduleController) removeID(id string) {
func (m *moduleController) removeModule(mod *module) {
m.mut.Lock()
defer m.mut.Unlock()

delete(m.modules, id)
m.o.ModuleRegistry.Unregister(id)
m.o.ModuleRegistry.Unregister(mod.o.ID)
delete(m.modules, mod.o.ID)
}

func (m *moduleController) addModule(mod *module) error {
m.mut.Lock()
defer m.mut.Unlock()
if err := m.o.ModuleRegistry.Register(mod.o.ID, mod); err != nil {
level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.ID, "err", err)
return err
}
m.modules[mod.o.ID] = struct{}{}
return nil
}

// ModuleIDs implements [controller.ModuleController].
func (m *moduleController) ModuleIDs() []string {
m.mut.RLock()
defer m.mut.RUnlock()

return maps.Keys(m.modules)
}

Expand All @@ -86,7 +91,7 @@ type module struct {
}

type moduleOptions struct {
ID string
ID string // ID is the full name including all parents, "module.file.example.prometheus.remote_write.id".
export component.ExportFunc
parent *moduleController
*moduleControllerOptions
Expand Down Expand Up @@ -135,9 +140,14 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error {
// will be run until Run is called.
//
// Run blocks until the provided context is canceled.
func (c *module) Run(ctx context.Context) {
defer c.o.parent.removeID(c.o.ID)
func (c *module) Run(ctx context.Context) error {
if err := c.o.parent.addModule(c); err != nil {
return err
}
defer c.o.parent.removeModule(c)

c.f.Run(ctx)
return nil
}

// moduleControllerOptions holds static options for module controller.
Expand Down
76 changes: 76 additions & 0 deletions pkg/flow/module_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package flow

import (
"context"
"testing"
"time"

"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/stretchr/testify/require"
)

func TestIDRemovalIfFailedToLoad(t *testing.T) {
f := New(testOptions(t))

fullContent := "test.fail.module \"t1\" { content = \"\" }"
fl, err := ParseSource("test", []byte(fullContent))
require.NoError(t, err)
err = f.LoadSource(fl, nil)
require.NoError(t, err)
ctx := context.Background()
ctx, cnc := context.WithTimeout(ctx, 600*time.Second)

go f.Run(ctx)
var t1 *componenttest.TestFailModule
require.Eventually(t, func() bool {
t1 = f.loader.Components()[0].Component().(*componenttest.TestFailModule)
return t1 != nil
}, 10*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// This should be one due to t1.
return len(f.modules.List()) == 1
}, 10*time.Second, 100*time.Millisecond)
badContent :=
`test.fail.module "bad" {
content=""
fail=true
}`
err = t1.UpdateContent(badContent)
// Because we have bad content this should fail, but the ids should be removed.
require.Error(t, err)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// Only one since the bad one never should have been added.
rightLength := len(f.modules.List()) == 1
_, foundT1 := f.modules.Get("test.fail.module.t1")
return rightLength && foundT1
}, 10*time.Second, 100*time.Millisecond)
// fail a second time to ensure the once is done again.
err = t1.UpdateContent(badContent)
require.Error(t, err)

goodContent :=
`test.fail.module "good" {
content=""
fail=false
}`
err = t1.UpdateContent(goodContent)
require.NoError(t, err)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
modT1, foundT1 := f.modules.Get("test.fail.module.t1")
modGood, foundGood := f.modules.Get("test.fail.module.t1/test.fail.module.good")
return modT1 != nil && modGood != nil && foundT1 && foundGood
}, 10*time.Second, 100*time.Millisecond)
cnc()
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// All should be cleaned up.
return len(f.modules.List()) == 0
}, 10*time.Second, 100*time.Millisecond)
}
2 changes: 1 addition & 1 deletion pkg/flow/module_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newModuleRegistry() *moduleRegistry {
}
}

// Get retrives a module by ID.
// Get retrieves a module by ID.
func (reg *moduleRegistry) Get(id string) (*module, bool) {
reg.mut.RLock()
defer reg.mut.RUnlock()
Expand Down
Loading

0 comments on commit 473938a

Please sign in to comment.