Skip to content

Commit

Permalink
fixes #1959
Browse files Browse the repository at this point in the history
controller startup failure troubleshooting was complicated by a failure to surface error messages from the `GetModuleContext` streaming end-point.

errors encountered from that end-point are now made visible and if those errors are deemed fatal the runner will terminate.
  • Loading branch information
jonathanj-square committed Jul 4, 2024
1 parent 0e17f9f commit 4b1f5b1
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 35 deletions.
2 changes: 1 addition & 1 deletion buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
return e, nil
}
schemaSync := e.startSchemaSync(ctx)
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync)
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, client.PullSchema, schemaSync, rpc.AlwaysRetry())
return e, nil
}

Expand Down
51 changes: 37 additions & 14 deletions internal/modulecontext/module_context.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package modulecontext

import (
"connectrpc.com/connect"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -180,7 +182,7 @@ func (m ModuleContext) BehaviorForVerb(ref schema.Ref) (optional.Option[VerbBeha
}

type ModuleContextSupplier interface {
Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext))
Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext), errorRetryCallback func(err error) bool)
}

type grpcModuleContextSupplier struct {
Expand All @@ -191,7 +193,7 @@ func NewModuleContextSupplier(client ftlv1connect.VerbServiceClient) ModuleConte
return ModuleContextSupplier(grpcModuleContextSupplier{client})
}

func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext)) {
func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName string, sink func(ctx context.Context, moduleContext ModuleContext), errorRetryCallback func(err error) bool) {
request := &ftlv1.ModuleContextRequest{Module: moduleName}
callback := func(_ context.Context, resp *ftlv1.ModuleContextResponse) error {
mc, err := FromProto(resp)
Expand All @@ -201,7 +203,7 @@ func (g grpcModuleContextSupplier) Subscribe(ctx context.Context, moduleName str
sink(ctx, mc)
return nil
}
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{}, request, g.client.GetModuleContext, callback)
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{}, request, g.client.GetModuleContext, callback, errorRetryCallback)
}

// NewDynamicContext creates a new DynamicModuleContext. This operation blocks
Expand All @@ -217,22 +219,43 @@ func NewDynamicContext(ctx context.Context, supplier ModuleContextSupplier, modu
await.Add(1)
releaseOnce := sync.Once{}

ctx, cancel := context.WithCancelCause(ctx)
deadline, timeoutCancel := context.WithTimeout(ctx, 5*time.Second)
g, _ := errgroup.WithContext(deadline)
defer timeoutCancel()

// asynchronously consumes a subscription of ModuleContext changes and signals the arrival of the first
supplier.Subscribe(ctx, moduleName, func(ctx context.Context, moduleContext ModuleContext) {
result.current.Store(moduleContext)
releaseOnce.Do(func() {
await.Done()
supplier.Subscribe(
ctx,
moduleName,
func(ctx context.Context, moduleContext ModuleContext) {
result.current.Store(moduleContext)
releaseOnce.Do(func() {
await.Done()
})
},
func(err error) bool {
var connectErr *connect.Error

if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeInternal {
cancel(err)
await.Done()
return false
}

return true
})
})

deadline, cancellation := context.WithTimeout(ctx, 5*time.Second)
g, _ := errgroup.WithContext(deadline)
defer cancellation()

// wait for first ModuleContext to be made available (with the above timeout)
// await the WaitGroup's completion which either signals the availability of the
// first ModuleContext or an error
g.Go(func() error {
await.Wait()
return nil
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
})

if err := g.Wait(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/modulecontext/module_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDynamicContextUpdate(t *testing.T) {
assert.Equal(t, mc2, dynamic.CurrentContext())
}

func (mcs *manualContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx ModuleContext)) {
func (mcs *manualContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx ModuleContext), _ func(error) bool) {
sink(ctx, mcs.initialCtx)
mcs.sink = sink
}
54 changes: 36 additions & 18 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ func RetryStreamingClientStream[Req, Resp any](
}
}

// AlwaysRetry instructs RetryStreamingServerStream to always retry the errors it encounters when
// supplied as the errorRetryCallback argument
func AlwaysRetry() func(error) bool {
return func(err error) bool { return true }
}

// RetryStreamingServerStream will repeatedly call handler with responses from
// the stream returned by "rpc" until handler returns an error or the context is
// cancelled.
Expand All @@ -204,43 +210,55 @@ func RetryStreamingServerStream[Req, Resp any](
req *Req,
rpc func(context.Context, *connect.Request[Req]) (*connect.ServerStreamForClient[Resp], error),
handler func(ctx context.Context, resp *Resp) error,
errorRetryCallback func(err error) bool,
) {
logLevel := log.Debug
errored := false
logger := log.FromContext(ctx)
for {
stream, err := rpc(ctx, connect.NewRequest(req))
if err == nil {
for stream.Receive() {
req := stream.Msg()
err = handler(ctx, req)
if err != nil {
for {
if stream.Receive() {
resp := stream.Msg()
err = handler(ctx, resp)

if err != nil {
break
}
if errored {
logger.Debugf("Server stream recovered")
errored = false
}
select {
case <-ctx.Done():
return
default:
}
retry.Reset()
logLevel = log.Warn
} else {
// Stream terminated; check if this was caused by an error
err = stream.Err()
logLevel = log.Warn
break
}
if errored {
logger.Debugf("Server stream recovered")
errored = false
}
select {
case <-ctx.Done():
return
default:
}
retry.Reset()
logLevel = log.Warn
}
}

if err != nil {
err = stream.Err()
}
errored = true
delay := retry.Duration()
if err != nil && !errors.Is(err, context.Canceled) {
if errorRetryCallback != nil && !errorRetryCallback(err) {
logger.Errorf(err, "Stream handler encountered a non-retryable error")
return
}

logger.Logf(logLevel, "Stream handler failed, retrying in %s: %s", delay, err)
} else if err == nil {
logger.Debugf("Stream finished, retrying in %s", delay)
}

select {
case <-ctx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion testutils/modulecontext/module_context_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ func MakeDynamic(ctx context.Context, m modulecontext.ModuleContext) *modulecont
return result
}

func (smc SingleContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx modulecontext.ModuleContext)) {
func (smc SingleContextSupplier) Subscribe(ctx context.Context, _ string, sink func(ctx context.Context, mCtx modulecontext.ModuleContext), _ func(error) bool) {
sink(ctx, smc.moduleCtx)
}

0 comments on commit 4b1f5b1

Please sign in to comment.