Skip to content

Commit

Permalink
feat: proxy calls through the runner
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 1, 2024
1 parent a98f447 commit 376dcfd
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 1 deletion.
114 changes: 114 additions & 0 deletions backend/runner/controllerproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package runner

import (
"context"
"fmt"
"net/url"

"connectrpc.com/connect"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
)

var _ ftlv1connect.VerbServiceHandler = &runnerProxy{}
var _ ftlv1connect.ModuleServiceHandler = &runnerProxy{}

type runnerProxy struct {
controllerVerbService ftlv1connect.VerbServiceClient
controllerModuleService ftlv1connect.ModuleServiceClient
bindAddress *url.URL
}

func runProxyServer(ctx context.Context, controllerVerbService ftlv1connect.VerbServiceClient, controllerModuleService ftlv1connect.ModuleServiceClient) (*runnerProxy, error) {
proxy := &runnerProxy{
controllerVerbService: controllerVerbService,
controllerModuleService: controllerModuleService,
}
logger := log.FromContext(ctx)
parse, err := url.Parse("http://127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}
proxyServer, err := rpc.NewServer(ctx, parse,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, proxy),
rpc.GRPC(ftlv1connect.NewModuleServiceHandler, proxy),
)
if err != nil {
return nil, fmt.Errorf("failed to create server: %w", err)
}
urls := proxyServer.Bind.Subscribe(nil)
go func() {
err := proxyServer.Serve(ctx)
if err != nil {
logger.Errorf(err, "failed to serve")
return
}
}()
proxy.bindAddress = <-urls
return proxy, nil
}

func (r *runnerProxy) GetModuleContext(ctx context.Context, c *connect.Request[ftlv1.GetModuleContextRequest], c2 *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
moduleContext, err := r.controllerModuleService.GetModuleContext(ctx, connect.NewRequest(c.Msg))
if err != nil {
return fmt.Errorf("failed to get module context: %w", err)
}
for {
rcv := moduleContext.Receive()
if rcv {
err := c2.Send(moduleContext.Msg())
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
} else {
return fmt.Errorf("failed to receive message: %w", moduleContext.Err())
}
}

}

func (r *runnerProxy) AcquireLease(ctx context.Context, c *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
lease := r.controllerModuleService.AcquireLease(ctx)
for {
req, err := c.Receive()
if err != nil {
return fmt.Errorf("failed to receive message: %w", err)
}
err = lease.Send(req)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
msg, err := lease.Receive()
if err != nil {
return fmt.Errorf("failed to receive response message: %w", err)
}
err = c.Send(msg)
if err != nil {
return fmt.Errorf("failed to send response message: %w", err)
}
}

}

func (r *runnerProxy) PublishEvent(ctx context.Context, c *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
event, err := r.controllerModuleService.PublishEvent(ctx, connect.NewRequest(c.Msg))
if err != nil {
return nil, fmt.Errorf("failed to proxy event: %w", err)
}
return event, nil
}

func (r *runnerProxy) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (r *runnerProxy) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
call, err := r.controllerVerbService.Call(ctx, connect.NewRequest(c.Msg))
if err != nil {
return nil, fmt.Errorf("failed to proxy verb: %w", err)
}
return call, nil
}
12 changes: 11 additions & 1 deletion backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func Start(ctx context.Context, config Config) error {
logger.Debugf("Listening on %s", config.Bind)

controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error)
moduleServiceClient := rpc.Dial(ftlv1connect.NewModuleServiceClient, config.ControllerEndpoint.String(), log.Error)
verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, config.ControllerEndpoint.String(), log.Error)

key := config.Key
if key.IsZero() {
Expand All @@ -123,6 +125,11 @@ func Start(ctx context.Context, config Config) error {
}
}

proxy, err := runProxyServer(ctx, verbServiceClient, moduleServiceClient)
if err != nil {
observability.Runner.StartupFailed(ctx)
return fmt.Errorf("failed to start proxy: %w", err)
}
svc := &Service{
key: key,
identity: identityStore,
Expand All @@ -132,6 +139,7 @@ func Start(ctx context.Context, config Config) error {
deploymentLogQueue: make(chan log.Entry, 10000),
cancelFunc: doneFunc,
devEndpoint: config.DevEndpoint,
proxy: proxy,
}

module, err := svc.getModule(ctx, config.Deployment)
Expand Down Expand Up @@ -300,6 +308,7 @@ type Service struct {
deploymentLogQueue chan log.Entry
cancelFunc func()
devEndpoint optional.Option[url.URL]
proxy *runnerProxy
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down Expand Up @@ -397,7 +406,8 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
return fmt.Errorf("failed to download artefacts: %w", err)
}

envVars := []string{"FTL_ENDPOINT=" + s.config.ControllerEndpoint.String(),
logger.Infof("Setting FTL_ENDPOINT to %s", s.proxy.bindAddress.String())
envVars := []string{"FTL_ENDPOINT=" + s.proxy.bindAddress.String(),
"FTL_CONFIG=" + strings.Join(s.config.Config, ","),
"FTL_OBSERVABILITY_ENDPOINT=" + s.config.ControllerEndpoint.String()}
if s.config.DebugPort > 0 {
Expand Down

0 comments on commit 376dcfd

Please sign in to comment.