From 7d1b490caf45493bbd7131a266d85272d4c1ae25 Mon Sep 17 00:00:00 2001 From: Jon Johnson <113393155+jonathanj-square@users.noreply.github.com> Date: Thu, 4 Jul 2024 02:51:36 -0700 Subject: [PATCH] fix: controller startup failure visibility (#1969) fixes #1959 controller startup failure troubleshooting was complicated by a failure to surface error messages from the `GetModuleContext` streaming end-point. errors encountered from that end-point are now made visible and if those errors are deemed fatal the runner will terminate. --- buildengine/engine.go | 2 +- internal/modulecontext/module_context.go | 51 +++++++++++++----- internal/modulecontext/module_context_test.go | 2 +- internal/rpc/rpc.go | 54 ++++++++++++------- .../modulecontext/module_context_utils.go | 2 +- 5 files changed, 76 insertions(+), 35 deletions(-) diff --git a/buildengine/engine.go b/buildengine/engine.go index 75fbc4e691..03894be43d 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -132,7 +132,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul return e, nil } schemaSync := e.startSchemaSync(ctx) - go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync) + go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync, rpc.AlwaysRetry()) return e, nil } diff --git a/internal/modulecontext/module_context.go b/internal/modulecontext/module_context.go index e3783c9177..f870de1a0b 100644 --- a/internal/modulecontext/module_context.go +++ b/internal/modulecontext/module_context.go @@ -1,9 +1,11 @@ package modulecontext import ( + "connectrpc.com/connect" "context" "database/sql" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -180,7 +182,7 @@ func (m ModuleContext) BehaviorForVerb(ref schema.Ref) (optional.Option[VerbBeha } type ModuleContextSupplier interface { - Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext)) + Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext), errorRetryCallback func(err error) bool) } type grpcModuleContextSupplier struct { @@ -191,7 +193,7 @@ func NewModuleContextSupplier(client ftlv1connect.VerbServiceClient) ModuleConte return ModuleContextSupplier(grpcModuleContextSupplier{client}) } -func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext)) { +func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext), errorRetryCallback func(err error) bool) { request := &ftlv1.ModuleContextRequest{Module: moduleName} callback := func(_ context.Context, resp *ftlv1.ModuleContextResponse) error { mc, err := FromProto(resp) @@ -201,7 +203,7 @@ func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName str sink(ctx, mc) return nil } - go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{}, request, g.client.GetModuleContext, callback) + go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{}, request, g.client.GetModuleContext, callback, errorRetryCallback) } // NewDynamicContext creates a new DynamicModuleContext. This operation blocks @@ -217,22 +219,43 @@ func NewDynamicContext(ctx context.Context, supplier ModuleContextSupplier, modu await.Add(1) releaseOnce := sync.Once{} + ctx, cancel := context.WithCancelCause(ctx) + deadline, timeoutCancel := context.WithTimeout(ctx, 5*time.Second) + g, _ := errgroup.WithContext(deadline) + defer timeoutCancel() + // asynchronously consumes a subscription of ModuleContext changes and signals the arrival of the first - supplier.Subscribe(ctx, moduleName, func(ctx context.Context, moduleContext ModuleContext) { - result.current.Store(moduleContext) - releaseOnce.Do(func() { - await.Done() + supplier.Subscribe( + ctx, + moduleName, + func(ctx context.Context, moduleContext ModuleContext) { + result.current.Store(moduleContext) + releaseOnce.Do(func() { + await.Done() + }) + }, + func(err error) bool { + var connectErr *connect.Error + + if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeInternal { + cancel(err) + await.Done() + return false + } + + return true }) - }) - deadline, cancellation := context.WithTimeout(ctx, 5*time.Second) - g, _ := errgroup.WithContext(deadline) - defer cancellation() - - // wait for first ModuleContext to be made available (with the above timeout) + // await the WaitGroup's completion which either signals the availability of the + // first ModuleContext or an error g.Go(func() error { await.Wait() - return nil + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } }) if err := g.Wait(); err != nil { diff --git a/internal/modulecontext/module_context_test.go b/internal/modulecontext/module_context_test.go index acf6900973..eda23dd126 100644 --- a/internal/modulecontext/module_context_test.go +++ b/internal/modulecontext/module_context_test.go @@ -36,7 +36,7 @@ func TestDynamicContextUpdate(t *testing.T) { assert.Equal(t, mc2, dynamic.CurrentContext()) } -func (mcs *manualContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx ModuleContext)) { +func (mcs *manualContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx ModuleContext), _ func(error) bool) { sink(ctx, mcs.initialCtx) mcs.sink = sink } diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index ac21bdeb98..33f8a9bbda 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -195,6 +195,12 @@ func RetryStreamingClientStream[Req, Resp any]( } } +// AlwaysRetry instructs RetryStreamingServerStream to always retry the errors it encounters when +// supplied as the errorRetryCallback argument +func AlwaysRetry() func(error) bool { + return func(err error) bool { return true } +} + // RetryStreamingServerStream will repeatedly call handler with responses from // the stream returned by "rpc" until handler returns an error or the context is // cancelled. @@ -204,6 +210,7 @@ func RetryStreamingServerStream[Req, Resp any]( req *Req, rpc func(context.Context, *connect.Request[Req]) (*connect.ServerStreamForClient[Resp], error), handler func(ctx context.Context, resp *Resp) error, + errorRetryCallback func(err error) bool, ) { logLevel := log.Debug errored := false @@ -211,36 +218,47 @@ func RetryStreamingServerStream[Req, Resp any]( for { stream, err := rpc(ctx, connect.NewRequest(req)) if err == nil { - for stream.Receive() { - req := stream.Msg() - err = handler(ctx, req) - if err != nil { + for { + if stream.Receive() { + resp := stream.Msg() + err = handler(ctx, resp) + + if err != nil { + break + } + if errored { + logger.Debugf("Server stream recovered") + errored = false + } + select { + case <-ctx.Done(): + return + default: + } + retry.Reset() + logLevel = log.Warn + } else { + // Stream terminated; check if this was caused by an error + err = stream.Err() + logLevel = log.Warn break } - if errored { - logger.Debugf("Server stream recovered") - errored = false - } - select { - case <-ctx.Done(): - return - default: - } - retry.Reset() - logLevel = log.Warn } } - if err != nil { - err = stream.Err() - } errored = true delay := retry.Duration() if err != nil && !errors.Is(err, context.Canceled) { + if errorRetryCallback != nil && !errorRetryCallback(err) { + logger.Errorf(err, "Stream handler encountered a non-retryable error") + return + } + logger.Logf(logLevel, "Stream handler failed, retrying in %s: %s", delay, err) } else if err == nil { logger.Debugf("Stream finished, retrying in %s", delay) } + select { case <-ctx.Done(): return diff --git a/testutils/modulecontext/module_context_utils.go b/testutils/modulecontext/module_context_utils.go index bfd7d6a734..acf1a61d9f 100644 --- a/testutils/modulecontext/module_context_utils.go +++ b/testutils/modulecontext/module_context_utils.go @@ -22,6 +22,6 @@ func MakeDynamic(ctx context.Context, m modulecontext.ModuleContext) *modulecont return result } -func (smc SingleContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx modulecontext.ModuleContext)) { +func (smc SingleContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx modulecontext.ModuleContext), _ func(error) bool) { sink(ctx, smc.moduleCtx) }