Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module id fail #5261

Merged
merged 24 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6db453b
push changes
mattdurham Sep 17, 2023
a2c2755
Add remove whenever using the component module.
mattdurham Sep 20, 2023
13c99ca
Add additional context and remove dead file.
mattdurham Sep 21, 2023
dfa0b77
Add long changelog comment
mattdurham Sep 21, 2023
01552ce
merge main
mattdurham Sep 21, 2023
f8f9930
fix linter
mattdurham Sep 21, 2023
8ceafb2
Remove mutex check
mattdurham Sep 21, 2023
54b238c
Add changes to support module id removal.
mattdurham Oct 5, 2023
91c0abe
Remove unneeded line
mattdurham Oct 5, 2023
f7caffe
main merge
mattdurham Oct 5, 2023
cb8d986
Fix merge errors.
mattdurham Oct 5, 2023
92cb1c1
Ensure module is checked on first run.
mattdurham Oct 10, 2023
f9e89e0
rename and add comments
mattdurham Oct 10, 2023
6d28509
Add manual removal back in and make test closer to actual usage.
mattdurham Oct 10, 2023
a96a32e
Merge branch 'main' into module_id_fail
mattdurham Oct 10, 2023
6c44b21
move changelog comment to correct location
mattdurham Oct 10, 2023
db23876
A different approach by keying off run instead of build.
mattdurham Oct 11, 2023
a9d509d
Add test for duplicate registration.
mattdurham Oct 11, 2023
e81a2ef
Minor changes
mattdurham Oct 12, 2023
e5588be
Merge branch 'main' into module_id_fail
mattdurham Oct 12, 2023
e69db8c
PR feedback
mattdurham Oct 16, 2023
acaae50
Merge remote-tracking branch 'origin/module_id_fail' into module_id_fail
mattdurham Oct 16, 2023
77777b0
add locks around the reads for tests, its a bit hacky.
mattdurham Oct 16, 2023
9b9f7ad
Merge branch 'main' into module_id_fail
mattdurham Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Main (unreleased)

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
allowing invalid combination of configuration options. (@thampiotr)

- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to
permanently fail to load with `id already exists` error. (@mattdurham)

### Enhancements

Expand Down
6 changes: 5 additions & 1 deletion component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
)

Expand Down Expand Up @@ -71,7 +72,10 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin

// RunFlowController runs the flow controller that all module components start.
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
c.mod.Run(ctx)
err := c.mod.Run(ctx)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error running module", "id", c.opts.ID, "err", err)
}
}

// CurrentHealth contains the implementation details for CurrentHealth in a module component.
Expand Down
2 changes: 1 addition & 1 deletion component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Module interface {
//
// Run blocks until the provided context is canceled. The ID of a module as defined in
// ModuleController.NewModule will not be released until Run returns.
Run(context.Context)
Run(context.Context) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as an aside: this would be considered a breaking change to the API, which is re-enforcing the idea for me that everything should be moved to internal for 1.0 until we're ready to start exposing parts of our code as stable APIs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is mostly for the tests so we can check specific error conditions.

}

// ExportFunc is used for onExport of the Module
Expand Down
67 changes: 67 additions & 0 deletions pkg/flow/componenttest/testfailmodule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package componenttest

import (
"context"
"fmt"

"github.com/grafana/agent/component"
mod "github.com/grafana/agent/component/module"
)

func init() {
component.Register(component.Registration{
Name: "test.fail.module",
Args: TestFailArguments{},
Exports: mod.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
m, err := mod.NewModuleComponent(opts)
if err != nil {
return nil, err
}
if args.(TestFailArguments).Fail {
return nil, fmt.Errorf("module told to fail")
}
err = m.LoadFlowSource(nil, args.(TestFailArguments).Content)
if err != nil {
return nil, err
}
return &TestFailModule{
mc: m,
content: args.(TestFailArguments).Content,
opts: opts,
fail: args.(TestFailArguments).Fail,
ch: make(chan error),
}, nil
},
})
}

type TestFailArguments struct {
Content string `river:"content,attr"`
Fail bool `river:"fail,attr,optional"`
}

type TestFailModule struct {
content string
opts component.Options
ch chan error
mc *mod.ModuleComponent
fail bool
}

func (t *TestFailModule) Run(ctx context.Context) error {
go t.mc.RunFlowController(ctx)
<-ctx.Done()
return nil
}

func (t *TestFailModule) UpdateContent(content string) error {
t.content = content
err := t.mc.LoadFlowSource(nil, t.content)
return err
}

func (t *TestFailModule) Update(_ component.Arguments) error {
return nil
}
15 changes: 10 additions & 5 deletions pkg/flow/flow_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

componentBuilt := util.NewWaitTrigger()
componentRunning := util.NewWaitTrigger()

var (
svc = &testservices.Fake{
Expand Down Expand Up @@ -218,9 +218,14 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
Name: "service_consumer",
Args: struct{}{},
NeedsServices: []string{"service"},
Build: func(_ component.Options, _ component.Arguments) (component.Component, error) {
componentBuilt.Trigger()
return &testcomponents.Fake{}, nil
Build: func(o component.Options, _ component.Arguments) (component.Component, error) {
return &testcomponents.Fake{
RunFunc: func(ctx context.Context) error {
componentRunning.Trigger()
<-ctx.Done()
return nil
},
}, nil
},
},
}
Expand All @@ -243,7 +248,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) {
require.NoError(t, ctrl.LoadSource(f, nil))
go ctrl.Run(ctx)

require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built")
require.NoError(t, componentRunning.Wait(5*time.Second), "Component should have been built")

consumers := ctrl.GetServiceConsumers("service")
require.Len(t, consumers, 2, "There should be a consumer for the module loader and the module's component")
Expand Down
16 changes: 15 additions & 1 deletion pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"testing"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/dag"
"github.com/grafana/agent/pkg/flow/logging"
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string, availableServices []string) controller.ModuleController {
return nil
return fakeModuleController{}
},
},
}
Expand Down Expand Up @@ -304,3 +305,16 @@ func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) {
}
require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match")
}

type fakeModuleController struct{}

func (f fakeModuleController) NewModule(id string, export component.ExportFunc) (component.Module, error) {
return nil, nil
}

func (f fakeModuleController) ModuleIDs() []string {
return nil
}

func (f fakeModuleController) ClearModuleIDs() {
}
3 changes: 1 addition & 2 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
}

return err
}

Expand Down Expand Up @@ -303,7 +302,7 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
}

// Run runs the managed component in the calling goroutine until ctx is
// canceled. Evaluate must have been called at least once without retuning an
// canceled. Evaluate must have been called at least once without returning an
// error before calling Run.
//
// Run will immediately return ErrUnevaluated if Evaluate has never been called
Expand Down
38 changes: 24 additions & 14 deletions pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/worker"
Expand Down Expand Up @@ -46,9 +47,6 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co
if id != "" {
fullPath = path.Join(fullPath, id)
}
if _, found := m.modules[fullPath]; found {
return nil, fmt.Errorf("id %s already exists", id)
}

mod := newModule(&moduleOptions{
ID: fullPath,
Expand All @@ -57,26 +55,33 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co
parent: m,
})

if err := m.o.ModuleRegistry.Register(fullPath, mod); err != nil {
return nil, err
}

m.modules[fullPath] = struct{}{}
return mod, nil
}

func (m *moduleController) removeID(id string) {
func (m *moduleController) removeModule(mod *module) {
m.mut.Lock()
defer m.mut.Unlock()

delete(m.modules, id)
m.o.ModuleRegistry.Unregister(id)
m.o.ModuleRegistry.Unregister(mod.o.ID)
delete(m.modules, mod.o.ID)
}

func (m *moduleController) addModule(mod *module) error {
m.mut.Lock()
defer m.mut.Unlock()
if err := m.o.ModuleRegistry.Register(mod.o.ID, mod); err != nil {
level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.ID, "err", err)
return err
}
m.modules[mod.o.ID] = struct{}{}
return nil
}

// ModuleIDs implements [controller.ModuleController].
func (m *moduleController) ModuleIDs() []string {
m.mut.RLock()
defer m.mut.RUnlock()

return maps.Keys(m.modules)
}

Expand All @@ -86,7 +91,7 @@ type module struct {
}

type moduleOptions struct {
ID string
ID string // ID is the full name including all parents, "module.file.example.prometheus.remote_write.id".
export component.ExportFunc
parent *moduleController
*moduleControllerOptions
Expand Down Expand Up @@ -135,9 +140,14 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error {
// will be run until Run is called.
//
// Run blocks until the provided context is canceled.
func (c *module) Run(ctx context.Context) {
defer c.o.parent.removeID(c.o.ID)
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
func (c *module) Run(ctx context.Context) error {
if err := c.o.parent.addModule(c); err != nil {
return err
}
defer c.o.parent.removeModule(c)
Comment on lines +144 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get this merged, but it's standing out as potentially concerning to me that this changes things such that the lifetime of a component and module are now different: a component exists within the controller whenever it's defined in a file, even if it's not running, but a module doesn't exist until it starts running.

I'm not sure if this will introduce any problems, so let's keep an eye for related issues once this is merged.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this feels alright, the component is the parent of module(s), so the component life span should be greater than the modules it controls. Def something to keep an eye on.


c.f.Run(ctx)
return nil
}

// moduleControllerOptions holds static options for module controller.
Expand Down
76 changes: 76 additions & 0 deletions pkg/flow/module_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package flow

import (
"context"
"testing"
"time"

"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/stretchr/testify/require"
)

func TestIDRemovalIfFailedToLoad(t *testing.T) {
f := New(testOptions(t))

fullContent := "test.fail.module \"t1\" { content = \"\" }"
fl, err := ParseSource("test", []byte(fullContent))
require.NoError(t, err)
err = f.LoadSource(fl, nil)
require.NoError(t, err)
ctx := context.Background()
ctx, cnc := context.WithTimeout(ctx, 600*time.Second)

go f.Run(ctx)
var t1 *componenttest.TestFailModule
require.Eventually(t, func() bool {
t1 = f.loader.Components()[0].Component().(*componenttest.TestFailModule)
return t1 != nil
}, 10*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// This should be one due to t1.
return len(f.modules.List()) == 1
}, 10*time.Second, 100*time.Millisecond)
badContent :=
`test.fail.module "bad" {
content=""
fail=true
}`
err = t1.UpdateContent(badContent)
// Because we have bad content this should fail, but the ids should be removed.
require.Error(t, err)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// Only one since the bad one never should have been added.
rightLength := len(f.modules.List()) == 1
_, foundT1 := f.modules.Get("test.fail.module.t1")
return rightLength && foundT1
}, 10*time.Second, 100*time.Millisecond)
// fail a second time to ensure the once is done again.
err = t1.UpdateContent(badContent)
require.Error(t, err)

goodContent :=
`test.fail.module "good" {
content=""
fail=false
}`
err = t1.UpdateContent(goodContent)
require.NoError(t, err)
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
modT1, foundT1 := f.modules.Get("test.fail.module.t1")
modGood, foundGood := f.modules.Get("test.fail.module.t1/test.fail.module.good")
return modT1 != nil && modGood != nil && foundT1 && foundGood
}, 10*time.Second, 100*time.Millisecond)
cnc()
require.Eventually(t, func() bool {
f.loadMut.RLock()
defer f.loadMut.RUnlock()
// All should be cleaned up.
return len(f.modules.List()) == 0
}, 10*time.Second, 100*time.Millisecond)
}
2 changes: 1 addition & 1 deletion pkg/flow/module_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newModuleRegistry() *moduleRegistry {
}
}

// Get retrives a module by ID.
// Get retrieves a module by ID.
func (reg *moduleRegistry) Get(id string) (*module, bool) {
reg.mut.RLock()
defer reg.mut.RUnlock()
Expand Down
Loading