Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow: always allow running with a partially evaluated graph #4411

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ func (fr *flowRun) Run(configFile string) error {
// Perform the initial reload. This is done after starting the HTTP server so
// that /metric and pprof endpoints are available while the Flow controller
// is loading.
//
// TODO(rfratto): add a flag to permit the initial load failing, allowing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me looks reasonable. But I'd like to hear the opinion of @spartan0x117 and @jcreixell before moving forward.

// Flow to run in a partially-healthy state.
if err := reload(); err != nil {
var diags diag.Diagnostics
if errors.As(err, &diags) {
Expand Down
6 changes: 6 additions & 0 deletions component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ type Registration struct {

// Build should construct a new component from an initial Arguments and set
// of options.
//
// If Build returns an error, downstream components which depend on this
// component will not be evaluated.
//
// If Build returns a non-nil Component instance, that Component instance
// will run regardless of whether Build also returns a non-nil error.
Build func(opts Options, args Arguments) (Component, error)
}

Expand Down
36 changes: 27 additions & 9 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,28 @@ func (f *Flow) Run(ctx context.Context) {
f.loader.EvaluateDependencies(updated)
}

// If the graph was partially evaluated (i.e., one failed component
// prevented downstream components from initially evaluating), it's
// possible for a subset of components to not be running.
//
// We'll re-synchronize the list of running components to ensure that
// newly evaluated components are started.
//
// TODO(rfratto): this may be expensive with busy graphs; one solution
// could be to check for non-running components before calling
// Synchronize, or to check for a difference in the length of
// current synchronized runnables to the current list of
// f.loader.Components().
components := f.loader.Components()
runnables := make([]controller.RunnableNode, 0, len(components))
for _, uc := range components {
runnables = append(runnables, uc)
}
err := f.sched.Synchronize(runnables)
if err != nil {
level.Error(f.log).Log("msg", "failed to load components", "err", err)
}

case <-f.loadFinished:
level.Info(f.log).Log("msg", "scheduling loaded components")

Expand All @@ -259,19 +281,15 @@ func (f *Flow) Run(ctx context.Context) {
// file. Components in the graph will be marked as unhealthy if there was an
// error encountered during Load.
//
// The controller will only start running components after Load is called once
// without any configuration errors.
// If the Flow controller is running, loaded components will be scheduled for
// running.
func (f *Flow) LoadFile(file *File, args map[string]any) error {
defer f.loadedOnce.Store(true)

f.loadMut.Lock()
defer f.loadMut.Unlock()

diags := f.loader.Apply(args, file.Components, file.ConfigBlocks)
if !f.loadedOnce.Load() && diags.HasErrors() {
// The first call to Load should not run any components if there were
// errors in the configuration file.
return diags
}
f.loadedOnce.Store(true)

select {
case f.loadFinished <- struct{}{}:
Expand All @@ -281,7 +299,7 @@ func (f *Flow) LoadFile(file *File, args map[string]any) error {
return diags.ErrorOrNil()
}

// Ready returns whether the Flow controller has finished its initial load.
// Ready returns whether LoadFile has been invoked at least once.
func (f *Flow) Ready() bool {
return f.loadedOnce.Load()
}
8 changes: 5 additions & 3 deletions pkg/flow/internal/controller/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,14 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
if cn.managed == nil {
// We haven't built the managed component successfully yet.
managed, err := cn.reg.Build(cn.managedOpts, argsCopyValue)
if err != nil {
return fmt.Errorf("building component: %w", err)
}
cn.managed = managed
cn.args = argsCopyValue

// We do the error check at the end to allow the component to still return
// a component instance with an error to signal that it should still run.
if err != nil {
return fmt.Errorf("building component: %w", err)
}
return nil
}

Expand Down