Skip to content
This repository has been archived by the owner on Dec 16, 2024. It is now read-only.

Commit

Permalink
feat: add RunnerService + Go implementation (#30)
Browse files Browse the repository at this point in the history
This is not wired up to the Backplane yet.
  • Loading branch information
alecthomas authored May 22, 2023
1 parent bad5468 commit bdde132
Show file tree
Hide file tree
Showing 28 changed files with 1,509 additions and 298 deletions.
38 changes: 21 additions & 17 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

The actors in the diagrams are as follows:

| Actor | Description |
| ---------- | ---------------------------------------------------------------------------------------------------------- |
| Backplane | The coordination layer of FTL. This creates and manages Runner instances, routing, resource creation, etc. |
| Platform | The platform FTL is running on, eg. Kubernetes, VMs, etc. |
| Runner | The component of FTL that coordinates with the Backplane to spawn and route to user code. |
| Deployment | User code serving VerbService for a module written in a particular language. |
| Actor | Description |
| ---------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Backplane | The coordination layer of FTL. This creates and manages Runner instances, routing, resource creation, etc. |
| Platform | The platform FTL is running on, eg. Kubernetes, VMs, etc. |
| Runner | The component of FTL that coordinates with the Backplane to spawn and route to user code. |
| Deployment | Optional process containing user code serving VerbService for a module written in a particular language. This code may be dynamically loaded and served by the Runner. |

## System initialisation

Expand All @@ -30,7 +30,7 @@ sequenceDiagram
participant B as Backplane
box Module
participant R as Runner
participant M as Deployment
participant D as Deployment
end
C ->> B: GetArtefactDiffs()
Expand All @@ -47,12 +47,16 @@ sequenceDiagram
B -->> R: schema
R ->> B: GetDeploymentArtefacts(id)
B -->> R: artefacts
R ->> M: Start()
loop Until alive
R ->> D: Ping()
end
R ->> B: Deployed(id)
B ->> B: UpdateRoutingTable()
```

## Routing

This diagram shows a routing example of a client calling verb V0 which calls
This diagram shows a routing example for a client calling verb V0 which calls
verb V1.

```mermaid
Expand All @@ -61,21 +65,21 @@ sequenceDiagram
participant C as Client
participant B as Backplane
box LightYellow Module0
box Module0
participant R0 as Runner0
participant M0 as Deployment0
end
box LightGreen Module1
box Module1
participant R1 as Runner1
participant M1 as Deployment1
end
C ->> B: Call(V0)
B ->> R0: Call(V0)
R0 ->> M0: Call(V0)
M0 ->> B: Call(V1)
B ->> R1: Call(V1)
R1 ->> M1: Call(V1)
C ->> B: Call(Module0.V0)
B ->> R0: Call(Module0.V0)
R0 ->> M0: Call(Module0.V0)
M0 ->> B: Call(Module1.V1)
B ->> R1: Call(Module1.V1)
R1 ->> M1: Call(Module1.V1)
M1 -->> R1: R1
R1 -->> B: R1
B -->> M0: R1
Expand Down
56 changes: 44 additions & 12 deletions backplane/backplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"github.com/alecthomas/errors"
"github.com/bufbuild/connect-go"
grpcreflect "github.com/bufbuild/connect-grpcreflect-go"
mapset "github.com/deckarep/golang-set/v2"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"

"github.com/TBD54566975/ftl/backplane/internal/dao"
"github.com/TBD54566975/ftl/common/log"
Expand All @@ -20,9 +21,13 @@ import (
"github.com/TBD54566975/ftl/console"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/ftlv1connect"
pschema "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/schema"
)

// ArtefactChunkSize is size of each chunk streamed to the client.
const ArtefactChunkSize = 1024 * 1024

type Config struct {
Bind socket.Socket `help:"Socket to bind to." default:"tcp://localhost:8892"`
DSN string `help:"Postgres DSN." default:"postgres://localhost/ftl?sslmode=disable&user=postgres&password=secret"`
Expand Down Expand Up @@ -62,20 +67,33 @@ type Service struct {
dao *dao.DAO
}

func (s *Service) GetDeploymentArtefacts(ctx context.Context, req *connect.Request[ftlv1.GetDeploymentArtefactsRequest], resp *connect.ServerStream[ftlv1.GetDeploymentArtefactsResponse]) error {
dkey, err := uuid.Parse(req.Msg.DeploymentKey)
func (s *Service) RegisterRunner(ctx context.Context, req *connect.Request[ftlv1.RegisterRunnerRequest]) (*connect.Response[ftlv1.RegisterRunnerResponse], error) {
panic("unimplemented")
}

func (s *Service) GetDeployment(ctx context.Context, req *connect.Request[ftlv1.GetDeploymentRequest]) (*connect.Response[ftlv1.GetDeploymentResponse], error) {
deployment, err := s.getDeployment(ctx, req.Msg.DeploymentKey)
if err != nil {
return connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
return nil, err
}
deployment, err := s.dao.GetDeployment(ctx, dkey)
return connect.NewResponse(&ftlv1.GetDeploymentResponse{
Schema: deployment.Schema.ToProto().(*pschema.Module), //nolint:forcetypeassert
Artefacts: slices.Map(deployment.Artefacts, func(artefact *dao.Artefact) *ftlv1.DeploymentArtefact { return artefact.ToProto() }),
}), nil
}

func (s *Service) GetDeploymentArtefacts(ctx context.Context, req *connect.Request[ftlv1.GetDeploymentArtefactsRequest], resp *connect.ServerStream[ftlv1.GetDeploymentArtefactsResponse]) error {
deployment, err := s.getDeployment(ctx, req.Msg.DeploymentKey)
if err != nil {
return connect.NewError(connect.CodeNotFound, errors.WithStack(err))
return err
}
haveDigests := mapset.NewSet(req.Msg.HaveDigests...)
chunk := make([]byte, 1024*1024)
chunk := make([]byte, ArtefactChunkSize)
nextArtefact:
for _, artefact := range deployment.Artefacts {
if haveDigests.Contains(artefact.Digest.String()) {
continue
for _, clientArtefact := range req.Msg.HaveArtefacts {
if proto.Equal(artefact.ToProto(), clientArtefact) {
continue nextArtefact
}
}
for {
n, err := artefact.Content.Read(chunk)
Expand All @@ -84,13 +102,13 @@ func (s *Service) GetDeploymentArtefacts(ctx context.Context, req *connect.Reque
Artefact: artefact.ToProto(),
Chunk: chunk[:n],
}); err != nil {
return errors.WithStack(err)
return errors.Wrap(err, "could not send artefact chunk")
}
}
if err == io.EOF {
break
} else if err != nil {
return errors.WithStack(err)
return errors.Wrap(err, "could not read artefact chunk")
}
}
}
Expand Down Expand Up @@ -156,3 +174,17 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
logger.Infof("Created deployment %s", key)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: key.String()}), nil
}

func (s *Service) getDeployment(ctx context.Context, key string) (*dao.Deployment, error) {
dkey, err := uuid.Parse(key)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Wrap(err, "invalid deployment key"))
}
deployment, err := s.dao.GetDeployment(ctx, dkey)
if errors.Is(err, pgx.ErrNoRows) {
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
} else if err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.Wrap(err, "could not retrieve deployment"))
}
return deployment, nil
}
23 changes: 22 additions & 1 deletion backplane/internal/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ type DeploymentArtefact struct {
Path string
}

func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact {
return &ftlv1.DeploymentArtefact{
Digest: d.Digest.String(),
Executable: d.Executable,
Path: d.Path,
}
}

func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error) {
digest, err := sha256.ParseSHA256(in.Digest)
if err != nil {
return DeploymentArtefact{}, errors.WithStack(err)
}
return DeploymentArtefact{
Digest: digest,
Executable: in.Executable,
Path: in.Path,
}, nil
}

// CreateDeployment (possibly) creates a new deployment and associates
// previously created artefacts with it.
//
Expand Down Expand Up @@ -155,7 +175,8 @@ type Artefact struct {
Path string
Executable bool
Digest sha256.SHA256
Content io.Reader
// ~Zero-cost on-demand reader.
Content io.Reader
}

func (a *Artefact) ToProto() *ftlv1.DeploymentArtefact {
Expand Down
2 changes: 1 addition & 1 deletion backplane/internal/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ CREATE TABLE deployment_artefacts (
path VARCHAR(128) NOT NULL
);

CREATE INDEX deployment_artefacts_deployment_artefact_idx ON deployment_artefacts (artefact_id, deployment_id);
CREATE INDEX deployment_artefacts_deployment_id_idx ON deployment_artefacts (deployment_id);
File renamed without changes.
2 changes: 1 addition & 1 deletion bin/go
2 changes: 1 addition & 1 deletion bin/gofmt
37 changes: 37 additions & 0 deletions cmd/ftl-runner-go/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"os"
"path/filepath"

"github.com/alecthomas/kong"

_ "github.com/TBD54566975/ftl/common/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/common/log"
runner "github.com/TBD54566975/ftl/runner-go"
)

var config struct {
LogConfig log.Config `prefix:"log-" embed:""`
RunnerConfig runner.Config `embed:""`
}

func main() {
cacheDir, err := os.UserCacheDir()
if err != nil {
panic(err)
}
kctx := kong.Parse(&config, kong.Description(`
FTL - Towards a 𝝺-calculus for large-scale systems
The Runner is the component of FTL that coordinates with the Backplane to spawn
and route to user code.
`), kong.Vars{
"deploymentdir": filepath.Join(cacheDir, "ftl-runner-go", "deployments"),
})
logger := log.Configure(os.Stderr, config.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)
err = runner.Start(ctx, config.RunnerConfig)
kctx.FatalIfErrorf(err)
}
76 changes: 22 additions & 54 deletions cmd/ftl/cmd_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"path/filepath"

"github.com/alecthomas/errors"
"github.com/bufbuild/connect-go"
mapset "github.com/deckarep/golang-set/v2"
"github.com/google/uuid"

"github.com/TBD54566975/ftl/common/log"
"github.com/TBD54566975/ftl/common/download"
"github.com/TBD54566975/ftl/common/sha256"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/ftlv1connect"
Expand All @@ -22,11 +20,16 @@ type downloadCmd struct {
}

func (d *downloadCmd) Run(ctx context.Context, client ftlv1connect.BackplaneServiceClient) error {
logger := log.FromContext(ctx)

haveDigests := mapset.NewSet[string]()
return download.Artefacts(ctx, client, d.Deployment, d.Dest)
}

err := filepath.Walk(d.Dest, func(path string, info os.FileInfo, err error) error {
func (d *downloadCmd) getLocalArtefacts() ([]*ftlv1.DeploymentArtefact, error) {
haveArtefacts := []*ftlv1.DeploymentArtefact{}
dest, err := filepath.Abs(d.Dest)
if err != nil {
return nil, errors.WithStack(err)
}
err = filepath.Walk(dest, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -37,55 +40,20 @@ func (d *downloadCmd) Run(ctx context.Context, client ftlv1connect.BackplaneServ
if err != nil {
return errors.WithStack(err)
}
haveDigests.Add(sum.String())
return nil
})
if err != nil {
return errors.WithStack(err)
}

stream, err := client.GetDeploymentArtefacts(ctx, connect.NewRequest(&ftlv1.GetDeploymentArtefactsRequest{
DeploymentKey: d.Deployment.String(),
HaveDigests: haveDigests.ToSlice(),
}))
if err != nil {
return errors.WithStack(err)
}
var digest string
var w *os.File
for stream.Receive() {
msg := stream.Msg()
artefact := msg.Artefact
if digest != artefact.Digest {
if w != nil {
w.Close()
}
if !filepath.IsLocal(artefact.Path) {
return errors.Errorf("path %q is not local", artefact.Path)
}
logger.Infof("Downloading %s", filepath.Join(d.Dest, artefact.Path))
err = os.MkdirAll(filepath.Join(d.Dest, filepath.Dir(artefact.Path)), 0700)
if err != nil {
return errors.WithStack(err)
}
var mode os.FileMode = 0600
if artefact.Executable {
mode = 0700
}
w, err = os.OpenFile(filepath.Join(d.Dest, artefact.Path), os.O_CREATE|os.O_WRONLY, mode)
if err != nil {
return errors.WithStack(err)
}
digest = artefact.Digest
}

if _, err := w.Write(msg.Chunk); err != nil {
_ = w.Close()
relPath, err := filepath.Rel(dest, path)
if err != nil {
return errors.WithStack(err)
}
haveArtefacts = append(haveArtefacts, &ftlv1.DeploymentArtefact{
Path: relPath,
Digest: sum.String(),
Executable: info.Mode()&0111 != 0,
})
return nil
})
if err != nil {
return nil, errors.WithStack(err)
}
if w != nil {
w.Close()
}
return errors.WithStack(stream.Err())
return haveArtefacts, nil
}
3 changes: 2 additions & 1 deletion cmd/ftl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/alecthomas/kong"
"github.com/bufbuild/connect-go"

_ "github.com/TBD54566975/ftl/common/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/common/log"
"github.com/TBD54566975/ftl/common/rpc"
"github.com/TBD54566975/ftl/common/socket"
Expand Down Expand Up @@ -83,6 +84,6 @@ func main() {

func makeDialer[Client rpc.Pingable](newClient func(connect.HTTPClient, string, ...connect.ClientOption) Client) func() (Client, error) {
return func() (Client, error) {
return rpc.Dial(newClient, cli.Endpoint.URL()), nil
return rpc.Dial(newClient, cli.Endpoint.URL().String()), nil
}
}
Loading

0 comments on commit bdde132

Please sign in to comment.