Skip to content

Commit

Permalink
multi-worker daemon
Browse files Browse the repository at this point in the history
- [X] put multiples workers in a single binary ("-tags containerd standalone")
- [X] add worker selector to LLB vertex metadata
- [X] s/worker/executor/g
- [X] introduce the new "worker" concept #176 (comment)
- [X] fix up CLI
- [X] fix up tests
- allow using multiples workers (requires inter-vertex cache copier, HUGE!) --> will be separate PR

Implementation notes:
- "Workers" are renamed to "executors" now
- The new "worker" instance holds an "executor" instance and its
related stuffs such as the snapshotter
- The default worker is "runc-overlay"

Signed-off-by: Akihiro Suda <[email protected]>
  • Loading branch information
AkihiroSuda committed Dec 12, 2017
1 parent b7664f7 commit 8899776
Show file tree
Hide file tree
Showing 30 changed files with 975 additions and 575 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

BINARIES=bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe
BINARIES=bin/buildd bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe

binaries: $(BINARIES)

Expand Down
83 changes: 82 additions & 1 deletion cmd/buildd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import (
"net"
"os"
"path/filepath"
"sort"
"strings"

"github.com/containerd/containerd/sys"
"github.com/docker/go-connections/sockets"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/dockerfile"
"github.com/moby/buildkit/frontend/gateway"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand All @@ -20,6 +27,31 @@ import (
"google.golang.org/grpc"
)

type workerInitializerOpt struct {
sessionManager *session.Manager
root string
}

type workerInitializer struct {
fn func(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error)
// less priority number, more preferred
priority int
}

var (
appFlags []cli.Flag
workerInitializers []workerInitializer
)

func registerWorkerInitializer(wi workerInitializer, flags ...cli.Flag) {
workerInitializers = append(workerInitializers, wi)
sort.Slice(workerInitializers,
func(i, j int) bool {
return workerInitializers[i].priority < workerInitializers[j].priority
})
appFlags = append(appFlags, flags...)
}

func main() {
app := cli.NewApp()
app.Name = "buildd"
Expand Down Expand Up @@ -47,7 +79,7 @@ func main() {
},
}

app.Flags = appendFlags(app.Flags)
app.Flags = append(app.Flags, appFlags...)

app.Action = func(c *cli.Context) error {
ctx, cancel := context.WithCancel(appcontext.Context())
Expand Down Expand Up @@ -179,3 +211,52 @@ func unaryInterceptor(globalCtx context.Context) grpc.ServerOption {
return
})
}

func newController(c *cli.Context, root string) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
}
wc, err := newWorkerController(c, workerInitializerOpt{
sessionManager: sessionManager,
root: root,
})
if err != nil {
return nil, err
}
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
frontends["gateway.v0"] = gateway.NewGatewayFrontend()
return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Frontends: frontends,
})
}

func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) {
wc := &worker.Controller{}
for _, wi := range workerInitializers {
ws, err := wi.fn(c, wiOpt)
if err != nil {
return nil, err
}
for _, w := range ws {
logrus.Infof("Found worker %q", w.Name)
if err = wc.Add(w); err != nil {
return nil, err
}
}
}
nWorkers := len(wc.GetAll())
if nWorkers == 0 {
return nil, errors.New("no worker found, build the buildkit daemon with tags? (e.g. \"standalone\", \"containerd\")")
}
defaultWorker, err := wc.GetDefault()
if err != nil {
return nil, err
}
logrus.Infof("Found %d workers, default=%q", nWorkers, defaultWorker.Name)
logrus.Warn("Currently, only the default worker can be used.")
return wc, nil
}
67 changes: 56 additions & 11 deletions cmd/buildd/main_containerd.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,70 @@
// +build containerd,!standalone
// +build containerd

package main

import (
"github.com/moby/buildkit/control"
"os"
"strings"

ctd "github.com/containerd/containerd"
"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/containerd"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

func appendFlags(f []cli.Flag) []cli.Flag {
return append(f, []cli.Flag{
func init() {
registerWorkerInitializer(
workerInitializer{
fn: containerdWorkerInitializer,
// 1 is less preferred than 0 (runcCtor)
priority: 1,
},
cli.StringFlag{
Name: "containerd-worker",
Usage: "enable containerd workers (true/false/auto)",
Value: "auto",
},
cli.StringFlag{
Name: "containerd",
Name: "containerd-worker-addr",
Usage: "containerd socket",
Value: "/run/containerd/containerd.sock",
},
}...)
})
// TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)",
}

// root must be an absolute path
func newController(c *cli.Context, root string) (*control.Controller, error) {
socket := c.GlobalString("containerd")
func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
socket := c.GlobalString("containerd-worker-addr")
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("containerd-worker"))
if err != nil {
return nil, err
}
if (boolOrAuto == nil && !validContainerdSocket(socket)) || (boolOrAuto != nil && !*boolOrAuto) {
return nil, nil
}
opt, err := containerd.NewWorkerOpt(common.root, socket, ctd.DefaultSnapshotter)
if err != nil {
return nil, err
}
opt.SessionManager = common.sessionManager
w, err := worker.NewWorker(opt)
if err != nil {
return nil, err
}
return []*worker.Worker{w}, nil
}

return control.NewContainerd(root, socket)
func validContainerdSocket(socket string) bool {
if strings.HasPrefix(socket, "tcp://") {
// FIXME(AkihiroSuda): prohibit tcp?
return true
}
socketPath := strings.TrimPrefix(socket, "unix://")
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
// FIXME(AkihiroSuda): add more conditions
logrus.Warnf("skipping containerd worker, as %q does not exist", socketPath)
return false
}
// TODO: actually dial and call introspection API
return true
}
54 changes: 47 additions & 7 deletions cmd/buildd/main_standalone.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,57 @@
// +build standalone,!containerd
// +build standalone

// TODO(AkihiroSuda): s/standalone/oci/g

package main

import (
"github.com/moby/buildkit/control"
"os/exec"

"github.com/moby/buildkit/worker"
"github.com/moby/buildkit/worker/runc"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

func appendFlags(f []cli.Flag) []cli.Flag {
return f
func init() {
registerWorkerInitializer(
workerInitializer{
fn: ociWorkerInitializer,
priority: 0,
},
cli.StringFlag{
Name: "oci-worker",
Usage: "enable oci workers (true/false/auto)",
Value: "auto",
})
// TODO: allow multiple oci runtimes and snapshotters
}

func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) {
boolOrAuto, err := parseBoolOrAuto(c.GlobalString("oci-worker"))
if err != nil {
return nil, err
}
if (boolOrAuto == nil && !validOCIBinary()) || (boolOrAuto != nil && !*boolOrAuto) {
return nil, nil
}
opt, err := runc.NewWorkerOpt(common.root)
if err != nil {
return nil, err
}
opt.SessionManager = common.sessionManager
w, err := worker.NewWorker(opt)
if err != nil {
return nil, err
}
return []*worker.Worker{w}, nil
}

// root must be an absolute path
func newController(c *cli.Context, root string) (*control.Controller, error) {
return control.NewStandalone(root)
func validOCIBinary() bool {
_, err := exec.LookPath("runc")
if err != nil {
logrus.Warnf("skipping oci worker, as runc does not exist")
return false
}
return true
}
18 changes: 0 additions & 18 deletions cmd/buildd/main_unsupported.go

This file was deleted.

15 changes: 15 additions & 0 deletions cmd/buildd/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package main

import (
"strconv"
"strings"
)

// parseBoolOrAuto returns (nil, nil) if s is "auto"
func parseBoolOrAuto(s string) (*bool, error) {
if s == "" || strings.ToLower(s) == "auto" {
return nil, nil
}
b, err := strconv.ParseBool(s)
return &b, err
}
Loading

0 comments on commit 8899776

Please sign in to comment.