diff --git a/backend/runner/controllerproxy.go b/backend/runner/controllerproxy.go new file mode 100644 index 0000000000..4bbeb405ac --- /dev/null +++ b/backend/runner/controllerproxy.go @@ -0,0 +1,114 @@ +package runner + +import ( + "context" + "fmt" + "net/url" + + "connectrpc.com/connect" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" +) + +var _ ftlv1connect.VerbServiceHandler = &runnerProxy{} +var _ ftlv1connect.ModuleServiceHandler = &runnerProxy{} + +type runnerProxy struct { + controllerVerbService ftlv1connect.VerbServiceClient + controllerModuleService ftlv1connect.ModuleServiceClient + bindAddress *url.URL +} + +func runProxyServer(ctx context.Context, controllerVerbService ftlv1connect.VerbServiceClient, controllerModuleService ftlv1connect.ModuleServiceClient) (*runnerProxy, error) { + proxy := &runnerProxy{ + controllerVerbService: controllerVerbService, + controllerModuleService: controllerModuleService, + } + logger := log.FromContext(ctx) + parse, err := url.Parse("http://127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to parse url: %w", err) + } + proxyServer, err := rpc.NewServer(ctx, parse, + rpc.GRPC(ftlv1connect.NewVerbServiceHandler, proxy), + rpc.GRPC(ftlv1connect.NewModuleServiceHandler, proxy), + ) + if err != nil { + return nil, fmt.Errorf("failed to create server: %w", err) + } + urls := proxyServer.Bind.Subscribe(nil) + go func() { + err := proxyServer.Serve(ctx) + if err != nil { + logger.Errorf(err, "failed to serve") + return + } + }() + proxy.bindAddress = <-urls + return proxy, nil +} + +func (r *runnerProxy) GetModuleContext(ctx context.Context, c *connect.Request[ftlv1.GetModuleContextRequest], c2 *connect.ServerStream[ftlv1.GetModuleContextResponse]) error { + moduleContext, err := r.controllerModuleService.GetModuleContext(ctx, connect.NewRequest(c.Msg)) + if err != nil { + return fmt.Errorf("failed to get module context: %w", err) + } + for { + rcv := moduleContext.Receive() + if rcv { + err := c2.Send(moduleContext.Msg()) + if err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + } else { + return fmt.Errorf("failed to receive message: %w", moduleContext.Err()) + } + } + +} + +func (r *runnerProxy) AcquireLease(ctx context.Context, c *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error { + lease := r.controllerModuleService.AcquireLease(ctx) + for { + req, err := c.Receive() + if err != nil { + return fmt.Errorf("failed to receive message: %w", err) + } + err = lease.Send(req) + if err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + msg, err := lease.Receive() + if err != nil { + return fmt.Errorf("failed to receive response message: %w", err) + } + err = c.Send(msg) + if err != nil { + return fmt.Errorf("failed to send response message: %w", err) + } + } + +} + +func (r *runnerProxy) PublishEvent(ctx context.Context, c *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) { + event, err := r.controllerModuleService.PublishEvent(ctx, connect.NewRequest(c.Msg)) + if err != nil { + return nil, fmt.Errorf("failed to proxy event: %w", err) + } + return event, nil +} + +func (r *runnerProxy) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) { + return connect.NewResponse(&ftlv1.PingResponse{}), nil +} + +func (r *runnerProxy) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { + call, err := r.controllerVerbService.Call(ctx, connect.NewRequest(c.Msg)) + if err != nil { + return nil, fmt.Errorf("failed to proxy verb: %w", err) + } + return call, nil +} diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 39e5f5c527..4ededaefcf 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -97,6 +97,8 @@ func Start(ctx context.Context, config Config) error { logger.Debugf("Listening on %s", config.Bind) controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error) + moduleServiceClient := rpc.Dial(ftlv1connect.NewModuleServiceClient, config.ControllerEndpoint.String(), log.Error) + verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, config.ControllerEndpoint.String(), log.Error) key := config.Key if key.IsZero() { @@ -123,6 +125,11 @@ func Start(ctx context.Context, config Config) error { } } + proxy, err := runProxyServer(ctx, verbServiceClient, moduleServiceClient) + if err != nil { + observability.Runner.StartupFailed(ctx) + return fmt.Errorf("failed to start proxy: %w", err) + } svc := &Service{ key: key, identity: identityStore, @@ -132,6 +139,7 @@ func Start(ctx context.Context, config Config) error { deploymentLogQueue: make(chan log.Entry, 10000), cancelFunc: doneFunc, devEndpoint: config.DevEndpoint, + proxy: proxy, } module, err := svc.getModule(ctx, config.Deployment) @@ -300,6 +308,7 @@ type Service struct { deploymentLogQueue chan log.Entry cancelFunc func() devEndpoint optional.Option[url.URL] + proxy *runnerProxy } func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { @@ -397,7 +406,8 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s return fmt.Errorf("failed to download artefacts: %w", err) } - envVars := []string{"FTL_ENDPOINT=" + s.config.ControllerEndpoint.String(), + logger.Infof("Setting FTL_ENDPOINT to %s", s.proxy.bindAddress.String()) + envVars := []string{"FTL_ENDPOINT=" + s.proxy.bindAddress.String(), "FTL_CONFIG=" + strings.Join(s.config.Config, ","), "FTL_OBSERVABILITY_ENDPOINT=" + s.config.ControllerEndpoint.String()} if s.config.DebugPort > 0 {