Skip to content

Commit

Permalink
[WIP, DNM] multi-worker daemon
Browse files Browse the repository at this point in the history
- [X] put multiples workers in a single binary ("-tags containerd standalone")
- [X] add worker selector to LLB vertex metadata
- [ ] allow using multiples workers (requires inter-vertex cache copier, HUGE!)

Implementation notes:
- A new structure called "metaworker" holds a worker instance and its
related stuffs such as the snapshotter
- For containerd, we have separate workers for each of the available
snapshotters: containerd-overlay, containerd-btrfs, ...
- The default worker is "runc-overlay"

Signed-off-by: Akihiro Suda <[email protected]>
  • Loading branch information
AkihiroSuda committed Dec 7, 2017
1 parent d66501e commit e3e3115
Show file tree
Hide file tree
Showing 23 changed files with 704 additions and 537 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

BINARIES=bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe
BINARIES=bin/buildd bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe

binaries: $(BINARIES)

Expand Down
70 changes: 69 additions & 1 deletion cmd/buildd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ import (
"net"
"os"
"path/filepath"
"sort"
"strings"

"github.com/containerd/containerd/sys"
"github.com/docker/go-connections/sockets"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/dockerfile"
"github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/metaworker"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
Expand All @@ -20,6 +26,12 @@ import (
"google.golang.org/grpc"
)

var (
appFlags []cli.Flag
// key: priority (+: less preferred, -: more preferred)
metaWorkerCtors = make(map[int]func(c *cli.Context, root string) ([]*metaworker.MetaWorker, error), 0)
)

func main() {
app := cli.NewApp()
app.Name = "buildd"
Expand All @@ -45,9 +57,14 @@ func main() {
Usage: "debugging address (eg. 0.0.0.0:6060)",
Value: "",
},
cli.StringFlag{
Name: "default-worker",
Usage: "default worker name (eg. containerd-overlay)",
Value: "",
},
}

app.Flags = appendFlags(app.Flags)
app.Flags = append(app.Flags, appFlags...)

app.Action = func(c *cli.Context) error {
ctx, cancel := context.WithCancel(appcontext.Context())
Expand Down Expand Up @@ -179,3 +196,54 @@ func unaryInterceptor(globalCtx context.Context) grpc.ServerOption {
return
})
}

func newController(c *cli.Context, root string) (*control.Controller, error) {
mws, err := metaWorkers(c, root)
if err != nil {
return nil, err
}
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
frontends["gateway.v0"] = gateway.NewGatewayFrontend()
return control.NewController(control.Opt{
MetaWorkers: mws,
Frontends: frontends,
})
}

func metaWorkers(c *cli.Context, root string) ([]*metaworker.MetaWorker, error) {
defaultWorkerName := c.GlobalString("default-worker")
type ctorEntry struct {
priority int
ctor func(c *cli.Context, root string) ([]*metaworker.MetaWorker, error)
}
var ctors []ctorEntry
for p, ctor := range metaWorkerCtors {
ctors = append(ctors, ctorEntry{priority: p, ctor: ctor})
}
sort.Slice(ctors, func(i, j int) bool { return ctors[i].priority < ctors[j].priority })
var ret []*metaworker.MetaWorker
for _, e := range ctors {
mws, err := e.ctor(c, root)
if err != nil {
return ret, err
}
for _, mw := range mws {
logrus.Infof("Found worker %q", mw.Name)
ret = append(ret, mw)
if defaultWorkerName == mw.Name {
last := len(ret) - 1
ret[0], ret[last] = ret[last], ret[0]
}
}
}
if len(ret) == 0 {
return nil, errors.New("no worker found, build the buildkit daemon with tags? (e.g. \"standalone\", \"containerd\")")
}
if defaultWorkerName != "" && defaultWorkerName != ret[0].Name {
return nil, errors.Errorf("worker %q not found", defaultWorkerName)
}
logrus.Infof("Found %d workers, default=%q", len(ret), ret[0].Name)
logrus.Warn("Currently, only the default worker can be used.")
return ret, nil
}
55 changes: 46 additions & 9 deletions cmd/buildd/main_containerd.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,62 @@
// +build containerd,!standalone
// +build containerd

package main

import (
"github.com/moby/buildkit/control"
"os"
"strings"

"github.com/moby/buildkit/metaworker"
"github.com/moby/buildkit/metaworker/containerd"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

func appendFlags(f []cli.Flag) []cli.Flag {
return append(f, []cli.Flag{
func init() {
appFlags = append(appFlags,
cli.BoolFlag{
Name: "disable-containerd",
Usage: "disable containerd workers",
},
cli.StringFlag{
Name: "containerd",
Usage: "containerd socket",
Value: "/run/containerd/containerd.sock",
},
}...)
})
// 1 is less preferred than 0 (runcCtor)
metaWorkerCtors[1] = containerdCtor
}

// root must be an absolute path
func newController(c *cli.Context, root string) (*control.Controller, error) {
func containerdCtor(c *cli.Context, root string) ([]*metaworker.MetaWorker, error) {
socket := c.GlobalString("containerd")
if c.GlobalBool("disable-containerd") || skipContainerd(socket) {
return nil, nil
}
opts, err := containerd.NewMetaWorkerOpts(root, socket)
if err != nil {
return nil, err
}
var mws []*metaworker.MetaWorker
for _, opt := range opts {
mw, err := metaworker.NewMetaWorker(opt)
if err != nil {
return mws, err
}
mws = append(mws, mw)
}
return mws, nil
}

return control.NewContainerd(root, socket)
func skipContainerd(socket string) bool {
if strings.HasPrefix(socket, "tcp://") {
// FIXME(AkihiroSuda): prohibit tcp?
return false
}
socketPath := strings.TrimPrefix(socket, "unix://")
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
// FIXME(AkihiroSuda): add more conditions
logrus.Warnf("skipping containerd, as %q does not exist", socketPath)
return true
}
return false
}
36 changes: 29 additions & 7 deletions cmd/buildd/main_standalone.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
// +build standalone,!containerd
// +build standalone

// TODO(AkihiroSuda): s/standalone/runc/g

package main

import (
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/metaworker"
"github.com/moby/buildkit/metaworker/runc"
"github.com/urfave/cli"
)

func appendFlags(f []cli.Flag) []cli.Flag {
return f
func init() {
appFlags = append(appFlags,
cli.BoolFlag{
Name: "disable-runc",
Usage: "disable runc workers",
})
metaWorkerCtors[0] = runcCtor
}

// root must be an absolute path
func newController(c *cli.Context, root string) (*control.Controller, error) {
return control.NewStandalone(root)
func runcCtor(c *cli.Context, root string) ([]*metaworker.MetaWorker, error) {
if c.GlobalBool("disable-runc") {
return nil, nil
}
opts, err := runc.NewMetaWorkerOpts(root)
if err != nil {
return nil, err
}
var mws []*metaworker.MetaWorker
for _, opt := range opts {
mw, err := metaworker.NewMetaWorker(opt)
if err != nil {
return mws, err
}
mws = append(mws, mw)
}
return mws, nil
}
18 changes: 0 additions & 18 deletions cmd/buildd/main_unsupported.go

This file was deleted.

42 changes: 15 additions & 27 deletions control/control.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package control

import (
"github.com/containerd/containerd/snapshots"
"github.com/docker/distribution/reference"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/cacheimport"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/metaworker"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand All @@ -22,17 +18,9 @@ import (
)

type Opt struct {
Snapshotter snapshots.Snapshotter
CacheManager cache.Manager
Worker worker.Worker
SourceManager *source.Manager
InstructionCache solver.InstructionCache
Exporters map[string]exporter.Exporter
SessionManager *session.Manager
Frontends map[string]frontend.Frontend
ImageSource source.Source
CacheExporter *cacheimport.CacheExporter
CacheImporter *cacheimport.CacheImporter
MetaWorkers []*metaworker.MetaWorker
Frontends map[string]frontend.Frontend
// TODO: split more from metaworkers
}

type Controller struct { // TODO: ControlService
Expand All @@ -44,14 +32,8 @@ func NewController(opt Opt) (*Controller, error) {
c := &Controller{
opt: opt,
solver: solver.NewLLBSolver(solver.LLBOpt{
SourceManager: opt.SourceManager,
CacheManager: opt.CacheManager,
Worker: opt.Worker,
InstructionCache: opt.InstructionCache,
ImageSource: opt.ImageSource,
Frontends: opt.Frontends,
CacheExporter: opt.CacheExporter,
CacheImporter: opt.CacheImporter,
MetaWorkers: opt.MetaWorkers,
Frontends: opt.Frontends,
}),
}
return c, nil
Expand All @@ -63,7 +45,9 @@ func (c *Controller) Register(server *grpc.Server) error {
}

func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
du, err := c.opt.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{
// FIXME mw0
mw0 := c.opt.MetaWorkers[0]
du, err := mw0.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{
Filter: r.Filter,
})
if err != nil {
Expand Down Expand Up @@ -101,8 +85,10 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*

var expi exporter.ExporterInstance
var err error
// FIXME mw0
mw0 := c.opt.MetaWorkers[0]
if req.Exporter != "" {
exp, ok := c.opt.Exporters[req.Exporter]
exp, ok := mw0.Exporters[req.Exporter]
if !ok {
return nil, errors.Errorf("exporter %q could not be found", req.Exporter)
}
Expand Down Expand Up @@ -202,7 +188,9 @@ func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
logrus.Debugf("session started")
conn, opts := grpchijack.Hijack(stream)
defer conn.Close()
err := c.opt.SessionManager.HandleConn(stream.Context(), conn, opts)
// FIXME mw0
mw0 := c.opt.MetaWorkers[0]
err := mw0.SessionManager.HandleConn(stream.Context(), conn, opts)
logrus.Debugf("session finished: %v", err)
return err
}
Loading

0 comments on commit e3e3115

Please sign in to comment.