From 6c020446784ec4af9a1b436cf69691745d72409a Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Wed, 12 Jul 2023 11:09:43 -0400 Subject: [PATCH] This commit changes two behaviors about the Flow system: * Components can now opt-in to being run even if their initial evaluation fails by returning a component.Component instance alongside their evaluation error. Previously, if the initial evaluation of a component failed, it would not be run until a succesful evaluation of the component occurred. This change allows polling-based components (such as `remote.http`) to rely on their native polling mechanisms to auto-resolve health issues if their initial evaluation fails. Without this change, a `remote.http` component with no dependencies that fails on the first evaluate would not be re-evaluated again until the config file is reloaded. * LoadFile will no longer require the first evaluation to complete without error before scheduling components to be run. Previously, no components would run until all components evaluated successfully at least once. This change allows operating the Flow controller in a partially applied state, where a subset of components are healthy. This is critical in scenarios where a managed agent is being run where a subset of components may not always be immediately healthy on startup. Implementing the second behavior implies that the first behavior should also be implemented, since users may not understand when it is required to reload the config file to fix polling-based components. Callers may emulate the previous behavior by not calling `Run` until `LoadFile` succeeds at least once. For the scope of this PR, this is what `grafana-agent run` has always done, and it hasn't changed here. Supersedes #4395. --- cmd/internal/flowmode/cmd_run.go | 3 ++ component/registry.go | 6 ++++ pkg/flow/flow.go | 36 +++++++++++++++++------ pkg/flow/internal/controller/component.go | 8 +++-- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index ae2e0fe4afe4..324799301c40 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -272,6 +272,9 @@ func (fr *flowRun) Run(configFile string) error { // Perform the initial reload. This is done after starting the HTTP server so // that /metric and pprof endpoints are available while the Flow controller // is loading. + // + // TODO(rfratto): add a flag to permit the initial load failing, allowing + // Flow to run in a partially-healthy state. if err := reload(); err != nil { var diags diag.Diagnostics if errors.As(err, &diags) { diff --git a/component/registry.go b/component/registry.go index 623a96b764d9..3656563d2267 100644 --- a/component/registry.go +++ b/component/registry.go @@ -162,6 +162,12 @@ type Registration struct { // Build should construct a new component from an initial Arguments and set // of options. + // + // If Build returns an error, downstream components which depend on this + // component will not be evaluated. + // + // If Build returns a non-nil Component instance, that Component instance + // will run regardless of whether Build also returns a non-nil error. Build func(opts Options, args Arguments) (Component, error) } diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 8d5b1c2b6ff3..fb6406158a6f 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -239,6 +239,28 @@ func (f *Flow) Run(ctx context.Context) { f.loader.EvaluateDependencies(updated) } + // If the graph was partially evaluated (i.e., one failed component + // prevented downstream components from initially evaluating), it's + // possible for a subset of components to not be running. + // + // We'll re-synchronize the list of running components to ensure that + // newly evaluated components are started. + // + // TODO(rfratto): this may be expensive with busy graphs; one solution + // could be to check for non-running components before calling + // Synchronize, or to check for a difference in the length of + // current synchronized runnables to the current list of + // f.loader.Components(). + components := f.loader.Components() + runnables := make([]controller.RunnableNode, 0, len(components)) + for _, uc := range components { + runnables = append(runnables, uc) + } + err := f.sched.Synchronize(runnables) + if err != nil { + level.Error(f.log).Log("msg", "failed to load components", "err", err) + } + case <-f.loadFinished: level.Info(f.log).Log("msg", "scheduling loaded components") @@ -259,19 +281,15 @@ func (f *Flow) Run(ctx context.Context) { // file. Components in the graph will be marked as unhealthy if there was an // error encountered during Load. // -// The controller will only start running components after Load is called once -// without any configuration errors. +// If the Flow controller is running, loaded components will be scheduled for +// running. func (f *Flow) LoadFile(file *File, args map[string]any) error { + defer f.loadedOnce.Store(true) + f.loadMut.Lock() defer f.loadMut.Unlock() diags := f.loader.Apply(args, file.Components, file.ConfigBlocks) - if !f.loadedOnce.Load() && diags.HasErrors() { - // The first call to Load should not run any components if there were - // errors in the configuration file. - return diags - } - f.loadedOnce.Store(true) select { case f.loadFinished <- struct{}{}: @@ -281,7 +299,7 @@ func (f *Flow) LoadFile(file *File, args map[string]any) error { return diags.ErrorOrNil() } -// Ready returns whether the Flow controller has finished its initial load. +// Ready returns whether LoadFile has been invoked at least once. func (f *Flow) Ready() bool { return f.loadedOnce.Load() } diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index 4f18110f639e..e6aaa669cec7 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -323,12 +323,14 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error { if cn.managed == nil { // We haven't built the managed component successfully yet. managed, err := cn.reg.Build(cn.managedOpts, argsCopyValue) - if err != nil { - return fmt.Errorf("building component: %w", err) - } cn.managed = managed cn.args = argsCopyValue + // We do the error check at the end to allow the component to still return + // a component instance with an error to signal that it should still run. + if err != nil { + return fmt.Errorf("building component: %w", err) + } return nil }