From 88997760e3fee91b4fdc8837b8c86742e9ee5d1a Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 21 Nov 2017 08:08:36 +0000 Subject: [PATCH] multi-worker daemon - [X] put multiples workers in a single binary ("-tags containerd standalone") - [X] add worker selector to LLB vertex metadata - [X] s/worker/executor/g - [X] introduce the new "worker" concept https://github.com/moby/buildkit/pull/176#discussion_r153693928 - [X] fix up CLI - [X] fix up tests - allow using multiples workers (requires inter-vertex cache copier, HUGE!) --> will be separate PR Implementation notes: - "Workers" are renamed to "executors" now - The new "worker" instance holds an "executor" instance and its related stuffs such as the snapshotter - The default worker is "runc-overlay" Signed-off-by: Akihiro Suda --- Makefile | 2 +- cmd/buildd/main.go | 83 ++++- cmd/buildd/main_containerd.go | 67 +++- cmd/buildd/main_standalone.go | 54 +++- cmd/buildd/main_unsupported.go | 18 -- cmd/buildd/util.go | 15 + control/control.go | 75 ++--- control/control_containerd.go | 86 ----- control/control_default.go | 174 ---------- .../containerdexecutor/executor.go | 14 +- executor/executor.go | 29 ++ {worker => executor}/oci/hosts.go | 0 {worker => executor}/oci/resolvconf.go | 0 {worker => executor}/oci/spec_unix.go | 4 +- .../runcexecutor/executor.go | 14 +- frontend/dockerfile/dockerfile_test.go | 2 +- frontend/frontend.go | 4 +- frontend/gateway/gateway.go | 4 +- hack/dockerfiles/test.Dockerfile | 17 +- solver/exec.go | 18 +- solver/pb/ops.pb.go | 298 ++++++++++++++---- solver/pb/ops.proto | 7 + solver/solver.go | 91 ++++-- solver/vertex.go | 13 +- util/testutil/integration/containerd.go | 2 +- worker/containerd/containerd.go | 83 +++++ .../runc/runc.go | 77 ++--- .../runc/runc_test.go | 59 +--- worker/worker.go | 193 +++++++++++- worker/workercontroller.go | 47 +++ 30 files changed, 975 insertions(+), 575 deletions(-) delete mode 100644 cmd/buildd/main_unsupported.go create mode 100644 cmd/buildd/util.go delete mode 100644 control/control_containerd.go delete mode 100644 control/control_default.go rename worker/containerdworker/worker.go => executor/containerdexecutor/executor.go (83%) create mode 100644 executor/executor.go rename {worker => executor}/oci/hosts.go (100%) rename {worker => executor}/oci/resolvconf.go (100%) rename {worker => executor}/oci/spec_unix.go (94%) rename worker/runcworker/worker.go => executor/runcexecutor/executor.go (89%) create mode 100644 worker/containerd/containerd.go rename control/control_standalone.go => worker/runc/runc.go (80%) rename control/control_standalone_test.go => worker/runc/runc_test.go (64%) create mode 100644 worker/workercontroller.go diff --git a/Makefile b/Makefile index d7dc14b4e7e28..453f32eed9b38 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -BINARIES=bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe +BINARIES=bin/buildd bin/buildd-standalone bin/buildd-containerd bin/buildctl bin/buildctl-darwin bin/buildd.exe bin/buildctl.exe binaries: $(BINARIES) diff --git a/cmd/buildd/main.go b/cmd/buildd/main.go index 48f0fd6be7d21..117eadc259306 100644 --- a/cmd/buildd/main.go +++ b/cmd/buildd/main.go @@ -5,13 +5,20 @@ import ( "net" "os" "path/filepath" + "sort" "strings" "github.com/containerd/containerd/sys" "github.com/docker/go-connections/sockets" + "github.com/moby/buildkit/control" + "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/frontend/dockerfile" + "github.com/moby/buildkit/frontend/gateway" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/appcontext" "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/profiler" + "github.com/moby/buildkit/worker" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -20,6 +27,31 @@ import ( "google.golang.org/grpc" ) +type workerInitializerOpt struct { + sessionManager *session.Manager + root string +} + +type workerInitializer struct { + fn func(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) + // less priority number, more preferred + priority int +} + +var ( + appFlags []cli.Flag + workerInitializers []workerInitializer +) + +func registerWorkerInitializer(wi workerInitializer, flags ...cli.Flag) { + workerInitializers = append(workerInitializers, wi) + sort.Slice(workerInitializers, + func(i, j int) bool { + return workerInitializers[i].priority < workerInitializers[j].priority + }) + appFlags = append(appFlags, flags...) +} + func main() { app := cli.NewApp() app.Name = "buildd" @@ -47,7 +79,7 @@ func main() { }, } - app.Flags = appendFlags(app.Flags) + app.Flags = append(app.Flags, appFlags...) app.Action = func(c *cli.Context) error { ctx, cancel := context.WithCancel(appcontext.Context()) @@ -179,3 +211,52 @@ func unaryInterceptor(globalCtx context.Context) grpc.ServerOption { return }) } + +func newController(c *cli.Context, root string) (*control.Controller, error) { + sessionManager, err := session.NewManager() + if err != nil { + return nil, err + } + wc, err := newWorkerController(c, workerInitializerOpt{ + sessionManager: sessionManager, + root: root, + }) + if err != nil { + return nil, err + } + frontends := map[string]frontend.Frontend{} + frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend() + frontends["gateway.v0"] = gateway.NewGatewayFrontend() + return control.NewController(control.Opt{ + SessionManager: sessionManager, + WorkerController: wc, + Frontends: frontends, + }) +} + +func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) { + wc := &worker.Controller{} + for _, wi := range workerInitializers { + ws, err := wi.fn(c, wiOpt) + if err != nil { + return nil, err + } + for _, w := range ws { + logrus.Infof("Found worker %q", w.Name) + if err = wc.Add(w); err != nil { + return nil, err + } + } + } + nWorkers := len(wc.GetAll()) + if nWorkers == 0 { + return nil, errors.New("no worker found, build the buildkit daemon with tags? (e.g. \"standalone\", \"containerd\")") + } + defaultWorker, err := wc.GetDefault() + if err != nil { + return nil, err + } + logrus.Infof("Found %d workers, default=%q", nWorkers, defaultWorker.Name) + logrus.Warn("Currently, only the default worker can be used.") + return wc, nil +} diff --git a/cmd/buildd/main_containerd.go b/cmd/buildd/main_containerd.go index e711f5bbdb164..88ab7b063b427 100644 --- a/cmd/buildd/main_containerd.go +++ b/cmd/buildd/main_containerd.go @@ -1,25 +1,70 @@ -// +build containerd,!standalone +// +build containerd package main import ( - "github.com/moby/buildkit/control" + "os" + "strings" + + ctd "github.com/containerd/containerd" + "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/containerd" + "github.com/sirupsen/logrus" "github.com/urfave/cli" ) -func appendFlags(f []cli.Flag) []cli.Flag { - return append(f, []cli.Flag{ +func init() { + registerWorkerInitializer( + workerInitializer{ + fn: containerdWorkerInitializer, + // 1 is less preferred than 0 (runcCtor) + priority: 1, + }, + cli.StringFlag{ + Name: "containerd-worker", + Usage: "enable containerd workers (true/false/auto)", + Value: "auto", + }, cli.StringFlag{ - Name: "containerd", + Name: "containerd-worker-addr", Usage: "containerd socket", Value: "/run/containerd/containerd.sock", - }, - }...) + }) + // TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)", } -// root must be an absolute path -func newController(c *cli.Context, root string) (*control.Controller, error) { - socket := c.GlobalString("containerd") +func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) { + socket := c.GlobalString("containerd-worker-addr") + boolOrAuto, err := parseBoolOrAuto(c.GlobalString("containerd-worker")) + if err != nil { + return nil, err + } + if (boolOrAuto == nil && !validContainerdSocket(socket)) || (boolOrAuto != nil && !*boolOrAuto) { + return nil, nil + } + opt, err := containerd.NewWorkerOpt(common.root, socket, ctd.DefaultSnapshotter) + if err != nil { + return nil, err + } + opt.SessionManager = common.sessionManager + w, err := worker.NewWorker(opt) + if err != nil { + return nil, err + } + return []*worker.Worker{w}, nil +} - return control.NewContainerd(root, socket) +func validContainerdSocket(socket string) bool { + if strings.HasPrefix(socket, "tcp://") { + // FIXME(AkihiroSuda): prohibit tcp? + return true + } + socketPath := strings.TrimPrefix(socket, "unix://") + if _, err := os.Stat(socketPath); os.IsNotExist(err) { + // FIXME(AkihiroSuda): add more conditions + logrus.Warnf("skipping containerd worker, as %q does not exist", socketPath) + return false + } + // TODO: actually dial and call introspection API + return true } diff --git a/cmd/buildd/main_standalone.go b/cmd/buildd/main_standalone.go index 83a3818fa2717..6c34d34459d9c 100644 --- a/cmd/buildd/main_standalone.go +++ b/cmd/buildd/main_standalone.go @@ -1,17 +1,57 @@ -// +build standalone,!containerd +// +build standalone + +// TODO(AkihiroSuda): s/standalone/oci/g package main import ( - "github.com/moby/buildkit/control" + "os/exec" + + "github.com/moby/buildkit/worker" + "github.com/moby/buildkit/worker/runc" + "github.com/sirupsen/logrus" "github.com/urfave/cli" ) -func appendFlags(f []cli.Flag) []cli.Flag { - return f +func init() { + registerWorkerInitializer( + workerInitializer{ + fn: ociWorkerInitializer, + priority: 0, + }, + cli.StringFlag{ + Name: "oci-worker", + Usage: "enable oci workers (true/false/auto)", + Value: "auto", + }) + // TODO: allow multiple oci runtimes and snapshotters +} + +func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]*worker.Worker, error) { + boolOrAuto, err := parseBoolOrAuto(c.GlobalString("oci-worker")) + if err != nil { + return nil, err + } + if (boolOrAuto == nil && !validOCIBinary()) || (boolOrAuto != nil && !*boolOrAuto) { + return nil, nil + } + opt, err := runc.NewWorkerOpt(common.root) + if err != nil { + return nil, err + } + opt.SessionManager = common.sessionManager + w, err := worker.NewWorker(opt) + if err != nil { + return nil, err + } + return []*worker.Worker{w}, nil } -// root must be an absolute path -func newController(c *cli.Context, root string) (*control.Controller, error) { - return control.NewStandalone(root) +func validOCIBinary() bool { + _, err := exec.LookPath("runc") + if err != nil { + logrus.Warnf("skipping oci worker, as runc does not exist") + return false + } + return true } diff --git a/cmd/buildd/main_unsupported.go b/cmd/buildd/main_unsupported.go deleted file mode 100644 index ca53dcd51ca1a..0000000000000 --- a/cmd/buildd/main_unsupported.go +++ /dev/null @@ -1,18 +0,0 @@ -// +build !standalone,!containerd standalone,containerd - -package main - -import ( - "errors" - - "github.com/moby/buildkit/control" - "github.com/urfave/cli" -) - -func appendFlags(f []cli.Flag) []cli.Flag { - return f -} - -func newController(c *cli.Context, root string) (*control.Controller, error) { - return nil, errors.New("need to build daemon with either standalone or containerd build tag") -} diff --git a/cmd/buildd/util.go b/cmd/buildd/util.go new file mode 100644 index 0000000000000..5d462bd232036 --- /dev/null +++ b/cmd/buildd/util.go @@ -0,0 +1,15 @@ +package main + +import ( + "strconv" + "strings" +) + +// parseBoolOrAuto returns (nil, nil) if s is "auto" +func parseBoolOrAuto(s string) (*bool, error) { + if s == "" || strings.ToLower(s) == "auto" { + return nil, nil + } + b, err := strconv.ParseBool(s) + return &b, err +} diff --git a/control/control.go b/control/control.go index b62fae0128ce3..175bf8b0af3e3 100644 --- a/control/control.go +++ b/control/control.go @@ -1,18 +1,14 @@ package control import ( - "github.com/containerd/containerd/snapshots" "github.com/docker/distribution/reference" controlapi "github.com/moby/buildkit/api/services/control" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/client" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/solver" - "github.com/moby/buildkit/source" "github.com/moby/buildkit/worker" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -22,17 +18,9 @@ import ( ) type Opt struct { - Snapshotter snapshots.Snapshotter - CacheManager cache.Manager - Worker worker.Worker - SourceManager *source.Manager - InstructionCache solver.InstructionCache - Exporters map[string]exporter.Exporter SessionManager *session.Manager + WorkerController *worker.Controller Frontends map[string]frontend.Frontend - ImageSource source.Source - CacheExporter *cacheimport.CacheExporter - CacheImporter *cacheimport.CacheImporter } type Controller struct { // TODO: ControlService @@ -42,17 +30,8 @@ type Controller struct { // TODO: ControlService func NewController(opt Opt) (*Controller, error) { c := &Controller{ - opt: opt, - solver: solver.NewLLBSolver(solver.LLBOpt{ - SourceManager: opt.SourceManager, - CacheManager: opt.CacheManager, - Worker: opt.Worker, - InstructionCache: opt.InstructionCache, - ImageSource: opt.ImageSource, - Frontends: opt.Frontends, - CacheExporter: opt.CacheExporter, - CacheImporter: opt.CacheImporter, - }), + opt: opt, + solver: solver.NewLLBSolver(opt.WorkerController, opt.Frontends), } return c, nil } @@ -63,26 +42,29 @@ func (c *Controller) Register(server *grpc.Server) error { } func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) { - du, err := c.opt.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{ - Filter: r.Filter, - }) - if err != nil { - return nil, err - } - resp := &controlapi.DiskUsageResponse{} - for _, r := range du { - resp.Record = append(resp.Record, &controlapi.UsageRecord{ - ID: r.ID, - Mutable: r.Mutable, - InUse: r.InUse, - Size_: r.Size, - Parent: r.Parent, - UsageCount: int64(r.UsageCount), - Description: r.Description, - CreatedAt: r.CreatedAt, - LastUsedAt: r.LastUsedAt, + for _, w := range c.opt.WorkerController.GetAll() { + du, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{ + Filter: r.Filter, }) + if err != nil { + return nil, err + } + + for _, r := range du { + resp.Record = append(resp.Record, &controlapi.UsageRecord{ + // TODO: add worker info + ID: r.ID, + Mutable: r.Mutable, + InUse: r.InUse, + Size_: r.Size, + Parent: r.Parent, + UsageCount: int64(r.UsageCount), + Description: r.Description, + CreatedAt: r.CreatedAt, + LastUsedAt: r.LastUsedAt, + }) + } } return resp, nil } @@ -100,9 +82,14 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (* ctx = session.NewContext(ctx, req.Session) var expi exporter.ExporterInstance - var err error + // TODO: multiworker + // This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this. + w, err := c.opt.WorkerController.GetDefault() + if err != nil { + return nil, err + } if req.Exporter != "" { - exp, ok := c.opt.Exporters[req.Exporter] + exp, ok := w.Exporters[req.Exporter] if !ok { return nil, errors.Errorf("exporter %q could not be found", req.Exporter) } diff --git a/control/control_containerd.go b/control/control_containerd.go deleted file mode 100644 index e24cfd58459c5..0000000000000 --- a/control/control_containerd.go +++ /dev/null @@ -1,86 +0,0 @@ -// +build containerd - -package control - -import ( - "context" - "fmt" - "net" - "os" - "strings" - "time" - - "github.com/containerd/containerd" - "github.com/containerd/containerd/content" - "github.com/moby/buildkit/worker/containerdworker" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" -) - -func NewContainerd(root, address string) (*Controller, error) { - if err := os.MkdirAll(root, 0700); err != nil { - return nil, errors.Wrapf(err, "failed to create %s", root) - } - - // TODO: take lock to make sure there are no duplicates - client, err := containerd.New(address, containerd.WithDefaultNamespace("buildkit")) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) - } - - pd := newContainerdPullDeps(client) - - opt, err := defaultControllerOpts(root, *pd) - if err != nil { - return nil, err - } - - opt.Worker = containerdworker.New(client, root) - - return NewController(*opt) -} - -func newContainerdPullDeps(client *containerd.Client) *pullDeps { - diff := client.DiffService() - return &pullDeps{ - Snapshotter: client.SnapshotService(containerd.DefaultSnapshotter), - ContentStore: &noGCContentStore{client.ContentStore()}, - Applier: diff, - Differ: diff, - Images: client.ImageService(), - } -} - -func dialer(address string, timeout time.Duration) (net.Conn, error) { - address = strings.TrimPrefix(address, "unix://") - return net.DialTimeout("unix", address, timeout) -} - -func dialAddress(address string) string { - return fmt.Sprintf("unix://%s", address) -} - -// TODO: Replace this with leases - -type noGCContentStore struct { - content.Store -} -type noGCWriter struct { - content.Writer -} - -func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { - w, err := cs.Store.Writer(ctx, ref, size, expected) - return &noGCWriter{w}, err -} - -func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - opts = append(opts, func(info *content.Info) error { - if info.Labels == nil { - info.Labels = map[string]string{} - } - info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) - return nil - }) - return w.Writer.Commit(ctx, size, expected, opts...) -} diff --git a/control/control_default.go b/control/control_default.go deleted file mode 100644 index cee931a667c0a..0000000000000 --- a/control/control_default.go +++ /dev/null @@ -1,174 +0,0 @@ -// +build standalone containerd - -package control - -import ( - "path/filepath" - - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/diff" - "github.com/containerd/containerd/images" - ctdsnapshot "github.com/containerd/containerd/snapshots" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/cacheimport" - "github.com/moby/buildkit/cache/instructioncache" - "github.com/moby/buildkit/cache/metadata" - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/exporter" - imageexporter "github.com/moby/buildkit/exporter/containerimage" - localexporter "github.com/moby/buildkit/exporter/local" - "github.com/moby/buildkit/frontend" - "github.com/moby/buildkit/frontend/dockerfile" - "github.com/moby/buildkit/frontend/gateway" - "github.com/moby/buildkit/session" - "github.com/moby/buildkit/snapshot/blobmapping" - "github.com/moby/buildkit/source" - "github.com/moby/buildkit/source/containerimage" - "github.com/moby/buildkit/source/git" - "github.com/moby/buildkit/source/http" - "github.com/moby/buildkit/source/local" -) - -type pullDeps struct { - Snapshotter ctdsnapshot.Snapshotter - ContentStore content.Store - Applier diff.Differ - Differ diff.Differ - Images images.Store -} - -func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) { - md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) - if err != nil { - return nil, err - } - - snapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ - Content: pd.ContentStore, - Snapshotter: pd.Snapshotter, - MetadataStore: md, - }) - if err != nil { - return nil, err - } - - cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: snapshotter, - MetadataStore: md, - }) - if err != nil { - return nil, err - } - - ic := &instructioncache.LocalStore{ - MetadataStore: md, - Cache: cm, - } - - sm, err := source.NewManager() - if err != nil { - return nil, err - } - - sessm, err := session.NewManager() - if err != nil { - return nil, err - } - - is, err := containerimage.NewSource(containerimage.SourceOpt{ - Snapshotter: snapshotter, - ContentStore: pd.ContentStore, - SessionManager: sessm, - Applier: pd.Applier, - CacheAccessor: cm, - }) - if err != nil { - return nil, err - } - - sm.Register(is) - - gs, err := git.NewSource(git.Opt{ - CacheAccessor: cm, - MetadataStore: md, - }) - if err != nil { - return nil, err - } - - sm.Register(gs) - - hs, err := http.NewSource(http.Opt{ - CacheAccessor: cm, - MetadataStore: md, - }) - if err != nil { - return nil, err - } - - sm.Register(hs) - - ss, err := local.NewSource(local.Opt{ - SessionManager: sessm, - CacheAccessor: cm, - MetadataStore: md, - }) - if err != nil { - return nil, err - } - sm.Register(ss) - - exporters := map[string]exporter.Exporter{} - - imageExporter, err := imageexporter.New(imageexporter.Opt{ - Snapshotter: snapshotter, - ContentStore: pd.ContentStore, - Differ: pd.Differ, - Images: pd.Images, - SessionManager: sessm, - }) - if err != nil { - return nil, err - } - exporters[client.ExporterImage] = imageExporter - - localExporter, err := localexporter.New(localexporter.Opt{ - SessionManager: sessm, - }) - if err != nil { - return nil, err - } - exporters[client.ExporterLocal] = localExporter - - frontends := map[string]frontend.Frontend{} - frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend() - frontends["gateway.v0"] = gateway.NewGatewayFrontend() - - ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{ - Snapshotter: snapshotter, - ContentStore: pd.ContentStore, - SessionManager: sessm, - Differ: pd.Differ, - }) - - ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{ - Snapshotter: snapshotter, - ContentStore: pd.ContentStore, - Applier: pd.Applier, - CacheAccessor: cm, - SessionManager: sessm, - }) - - return &Opt{ - Snapshotter: snapshotter, - CacheManager: cm, - SourceManager: sm, - InstructionCache: ic, - Exporters: exporters, - SessionManager: sessm, - Frontends: frontends, - ImageSource: is, - CacheExporter: ce, - CacheImporter: ci, - }, nil -} diff --git a/worker/containerdworker/worker.go b/executor/containerdexecutor/executor.go similarity index 83% rename from worker/containerdworker/worker.go rename to executor/containerdexecutor/executor.go index c8be401686fdf..aaa1c0919328e 100644 --- a/worker/containerdworker/worker.go +++ b/executor/containerdexecutor/executor.go @@ -1,4 +1,4 @@ -package containerdworker +package containerdexecutor import ( "io" @@ -8,26 +8,26 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/cio" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/worker" - "github.com/moby/buildkit/worker/oci" "github.com/pkg/errors" "golang.org/x/net/context" ) -type containerdWorker struct { +type containerdExecutor struct { client *containerd.Client root string } -func New(client *containerd.Client, root string) worker.Worker { - return containerdWorker{ +func New(client *containerd.Client, root string) executor.Executor { + return containerdExecutor{ client: client, root: root, } } -func (w containerdWorker) Exec(ctx context.Context, meta worker.Meta, root cache.Mountable, mounts []worker.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) { +func (w containerdExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) { id := identity.NewID() resolvConf, err := oci.GetResolvConf(ctx, w.root) diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 0000000000000..31c6ba8390374 --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,29 @@ +package executor + +import ( + "io" + + "github.com/moby/buildkit/cache" + "golang.org/x/net/context" +) + +type Meta struct { + Args []string + Env []string + User string + Cwd string + Tty bool + // DisableNetworking bool +} + +type Mount struct { + Src cache.Mountable + Selector string + Dest string + Readonly bool +} + +type Executor interface { + // TODO: add stdout/err + Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error +} diff --git a/worker/oci/hosts.go b/executor/oci/hosts.go similarity index 100% rename from worker/oci/hosts.go rename to executor/oci/hosts.go diff --git a/worker/oci/resolvconf.go b/executor/oci/resolvconf.go similarity index 100% rename from worker/oci/resolvconf.go rename to executor/oci/resolvconf.go diff --git a/worker/oci/spec_unix.go b/executor/oci/spec_unix.go similarity index 94% rename from worker/oci/spec_unix.go rename to executor/oci/spec_unix.go index 4edc1ad2b2bfc..7d03f4ceebd81 100644 --- a/worker/oci/spec_unix.go +++ b/executor/oci/spec_unix.go @@ -12,8 +12,8 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/oci" "github.com/mitchellh/hashstructure" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/worker" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) @@ -21,7 +21,7 @@ import ( // Ideally we don't have to import whole containerd just for the default spec // GenerateSpec generates spec using containerd functionality. -func GenerateSpec(ctx context.Context, meta worker.Meta, mounts []worker.Mount, id, resolvConf, hostsFile string) (*specs.Spec, func(), error) { +func GenerateSpec(ctx context.Context, meta executor.Meta, mounts []executor.Mount, id string, resolvConf, hostsFile string) (*specs.Spec, func(), error) { c := &containers.Container{ ID: id, } diff --git a/worker/runcworker/worker.go b/executor/runcexecutor/executor.go similarity index 89% rename from worker/runcworker/worker.go rename to executor/runcexecutor/executor.go index 03220afc245e2..0234464ae369b 100644 --- a/worker/runcworker/worker.go +++ b/executor/runcexecutor/executor.go @@ -1,4 +1,4 @@ -package runcworker +package runcexecutor import ( "encoding/json" @@ -12,20 +12,20 @@ import ( runc "github.com/containerd/go-runc" "github.com/docker/docker/pkg/symlink" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/identity" - "github.com/moby/buildkit/worker" - "github.com/moby/buildkit/worker/oci" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" ) -type runcworker struct { +type runcExecutor struct { runc *runc.Runc root string } -func New(root string) (worker.Worker, error) { +func New(root string) (executor.Executor, error) { if err := exec.Command("runc", "--version").Run(); err != nil { return nil, errors.Wrap(err, "failed to find runc binary") } @@ -47,14 +47,14 @@ func New(root string) (worker.Worker, error) { Setpgid: true, } - w := &runcworker{ + w := &runcExecutor{ runc: runtime, root: root, } return w, nil } -func (w *runcworker) Exec(ctx context.Context, meta worker.Meta, root cache.Mountable, mounts []worker.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { +func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { resolvConf, err := oci.GetResolvConf(ctx, w.root) if err != nil { diff --git a/frontend/dockerfile/dockerfile_test.go b/frontend/dockerfile/dockerfile_test.go index b1487f3df7f5e..5f223115b0530 100644 --- a/frontend/dockerfile/dockerfile_test.go +++ b/frontend/dockerfile/dockerfile_test.go @@ -126,7 +126,7 @@ func testDockerfileInvalidCommand(t *testing.T, sb integration.Sandbox) { err = cmd.Run() require.Error(t, err) require.Contains(t, stdout.String(), "/bin/sh -c invalidcmd") - require.Contains(t, stdout.String(), "worker failed running") + require.Contains(t, stdout.String(), "executor failed running") } func testDockerfileADDFromURL(t *testing.T, sb integration.Sandbox) { diff --git a/frontend/frontend.go b/frontend/frontend.go index 29765617d7aa2..60ffa4b18dd55 100644 --- a/frontend/frontend.go +++ b/frontend/frontend.go @@ -4,8 +4,8 @@ import ( "io" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "golang.org/x/net/context" ) @@ -17,7 +17,7 @@ type Frontend interface { type FrontendLLBBridge interface { Solve(ctx context.Context, req SolveRequest) (cache.ImmutableRef, map[string][]byte, error) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) - Exec(ctx context.Context, meta worker.Meta, rootfs cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error + Exec(ctx context.Context, meta executor.Meta, rootfs cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error } type SolveRequest struct { diff --git a/frontend/gateway/gateway.go b/frontend/gateway/gateway.go index 5b1f877e5ec03..dc38af8d7e14c 100644 --- a/frontend/gateway/gateway.go +++ b/frontend/gateway/gateway.go @@ -14,12 +14,12 @@ import ( "github.com/docker/distribution/reference" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend" pb "github.com/moby/buildkit/frontend/gateway/pb" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/worker" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -144,7 +144,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten env = append(env, "BUILDKIT_SESSION_ID="+sid) - err = llbBridge.Exec(ctx, worker.Meta{ + err = llbBridge.Exec(ctx, executor.Meta{ Env: env, Args: args, Cwd: cwd, diff --git a/hack/dockerfiles/test.Dockerfile b/hack/dockerfiles/test.Dockerfile index b0d826c6cd119..73b6445649f7e 100644 --- a/hack/dockerfiles/test.Dockerfile +++ b/hack/dockerfiles/test.Dockerfile @@ -1,6 +1,7 @@ ARG RUNC_VERSION=74a17296470088de3805e138d3d87c62e613dfc4 ARG CONTAINERD_VERSION=v1.0.0 -ARG BUILDKIT_TARGET=standalone +# available targets: buildd (standalone+containerd), buildd-standalone, buildd-containerd +ARG BUILDKIT_TARGET=buildd ARG REGISTRY_VERSION=2.6 FROM golang:1.9-alpine AS gobuild-base @@ -47,6 +48,10 @@ RUN go build -ldflags '-d' -o /usr/bin/buildd-containerd -tags containerd ./cmd FROM registry:$REGISTRY_VERSION AS registry +FROM buildkit-base AS buildd +ENV CGO_ENABLED=0 +RUN go build -ldflags '-d' -o /usr/bin/buildd -tags "standalone containerd" ./cmd/buildd + FROM unit-tests AS integration-tests COPY --from=buildctl /usr/bin/buildctl /usr/bin/ COPY --from=buildd-containerd /usr/bin/buildd-containerd /usr/bin @@ -69,18 +74,24 @@ RUN apk add --no-cache git VOLUME /var/lib/buildkit # Copy together all binaries needed for standalone mode -FROM buildkit-export AS buildkit-standalone +FROM buildkit-export AS buildkit-buildd-standalone COPY --from=buildd-standalone /usr/bin/buildd-standalone /usr/bin/ COPY --from=buildctl /usr/bin/buildctl /usr/bin/ ENTRYPOINT ["buildd-standalone"] # Copy together all binaries for containerd mode -FROM buildkit-export AS buildkit-containerd +FROM buildkit-export AS buildkit-buildd-containerd COPY --from=runc /usr/bin/runc /usr/bin/ COPY --from=buildd-containerd /usr/bin/buildd-containerd /usr/bin/ COPY --from=buildctl /usr/bin/buildctl /usr/bin/ ENTRYPOINT ["buildd-containerd"] +# Copy together all binaries for standalone+containerd mode +FROM buildkit-export AS buildkit-buildd +COPY --from=runc /usr/bin/runc /usr/bin/ +COPY --from=buildd /usr/bin/buildd /usr/bin/ +COPY --from=buildctl /usr/bin/buildctl /usr/bin/ + FROM alpine AS containerd-runtime COPY --from=runc /usr/bin/runc /usr/bin/ COPY --from=containerd /go/src/github.com/containerd/containerd/bin/containerd* /usr/bin/ diff --git a/solver/exec.go b/solver/exec.go index 7b460013c2bfe..92992014e56c7 100644 --- a/solver/exec.go +++ b/solver/exec.go @@ -9,9 +9,9 @@ import ( "strings" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/progress/logs" - "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" @@ -22,15 +22,15 @@ const execCacheType = "buildkit.exec.v0" type execOp struct { op *pb.ExecOp cm cache.Manager - w worker.Worker + exec executor.Executor numInputs int } -func newExecOp(v Vertex, op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) { +func newExecOp(v Vertex, op *pb.Op_Exec, cm cache.Manager, exec executor.Executor) (Op, error) { return &execOp{ op: op.Exec, cm: cm, - w: w, + exec: exec, numInputs: len(v.Inputs()), }, nil } @@ -55,7 +55,7 @@ func (e *execOp) CacheKey(ctx context.Context) (digest.Digest, error) { } func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) { - var mounts []worker.Mount + var mounts []executor.Mount var outputs []Reference var root cache.Mountable @@ -97,7 +97,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro if m.Dest == pb.RootMount { root = mountable } else { - mounts = append(mounts, worker.Mount{Src: mountable, Dest: m.Dest, Readonly: m.Readonly, Selector: m.Selector}) + mounts = append(mounts, executor.Mount{Src: mountable, Dest: m.Dest, Readonly: m.Readonly, Selector: m.Selector}) } } @@ -105,7 +105,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro return mounts[i].Dest < mounts[j].Dest }) - meta := worker.Meta{ + meta := executor.Meta{ Args: e.op.Meta.Args, Env: e.op.Meta.Env, Cwd: e.op.Meta.Cwd, @@ -115,8 +115,8 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro defer stdout.Close() defer stderr.Close() - if err := e.w.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil { - return nil, errors.Wrapf(err, "worker failed running %v", meta.Args) + if err := e.exec.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil { + return nil, errors.Wrapf(err, "executor failed running %v", meta.Args) } refs := []Reference{} diff --git a/solver/pb/ops.pb.go b/solver/pb/ops.pb.go index ca236d4fa16cb..1de63115a3ec0 100644 --- a/solver/pb/ops.pb.go +++ b/solver/pb/ops.pb.go @@ -19,6 +19,7 @@ BuildOp BuildInput OpMetadata + WorkerConstraint Definition */ package pb @@ -451,6 +452,8 @@ type OpMetadata struct { IgnoreCache bool `protobuf:"varint,1,opt,name=ignore_cache,json=ignoreCache,proto3" json:"ignore_cache,omitempty"` // Description can be used for keeping any text fields that builder doesn't parse Description map[string]string `protobuf:"bytes,2,rep,name=description" json:"description,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // TODO: add worker constraint, etc. + WorkerConstraint *WorkerConstraint `protobuf:"bytes,3,opt,name=worker_constraint,json=workerConstraint" json:"worker_constraint,omitempty"` } func (m *OpMetadata) Reset() { *m = OpMetadata{} } @@ -472,6 +475,30 @@ func (m *OpMetadata) GetDescription() map[string]string { return nil } +func (m *OpMetadata) GetWorkerConstraint() *WorkerConstraint { + if m != nil { + return m.WorkerConstraint + } + return nil +} + +// WorkerConstraint is WIP. Will be changed. +type WorkerConstraint struct { + WorkerType string `protobuf:"bytes,1,opt,name=worker_type,json=workerType,proto3" json:"worker_type,omitempty"` +} + +func (m *WorkerConstraint) Reset() { *m = WorkerConstraint{} } +func (m *WorkerConstraint) String() string { return proto.CompactTextString(m) } +func (*WorkerConstraint) ProtoMessage() {} +func (*WorkerConstraint) Descriptor() ([]byte, []int) { return fileDescriptorOps, []int{11} } + +func (m *WorkerConstraint) GetWorkerType() string { + if m != nil { + return m.WorkerType + } + return "" +} + // Definition is the LLB definition structure with per-vertex metadata entries type Definition struct { Def [][]byte `protobuf:"bytes,1,rep,name=def" json:"def,omitempty"` @@ -482,7 +509,7 @@ type Definition struct { func (m *Definition) Reset() { *m = Definition{} } func (m *Definition) String() string { return proto.CompactTextString(m) } func (*Definition) ProtoMessage() {} -func (*Definition) Descriptor() ([]byte, []int) { return fileDescriptorOps, []int{11} } +func (*Definition) Descriptor() ([]byte, []int) { return fileDescriptorOps, []int{12} } func (m *Definition) GetDef() [][]byte { if m != nil { @@ -510,6 +537,7 @@ func init() { proto.RegisterType((*BuildOp)(nil), "pb.BuildOp") proto.RegisterType((*BuildInput)(nil), "pb.BuildInput") proto.RegisterType((*OpMetadata)(nil), "pb.OpMetadata") + proto.RegisterType((*WorkerConstraint)(nil), "pb.WorkerConstraint") proto.RegisterType((*Definition)(nil), "pb.Definition") } func (m *Op) Marshal() (dAtA []byte, err error) { @@ -1027,6 +1055,40 @@ func (m *OpMetadata) MarshalTo(dAtA []byte) (int, error) { i += copy(dAtA[i:], v) } } + if m.WorkerConstraint != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintOps(dAtA, i, uint64(m.WorkerConstraint.Size())) + n9, err := m.WorkerConstraint.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n9 + } + return i, nil +} + +func (m *WorkerConstraint) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WorkerConstraint) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.WorkerType) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintOps(dAtA, i, uint64(len(m.WorkerType))) + i += copy(dAtA[i:], m.WorkerType) + } return i, nil } @@ -1072,11 +1134,11 @@ func (m *Definition) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintOps(dAtA, i, uint64((&v).Size())) - n9, err := (&v).MarshalTo(dAtA[i:]) + n10, err := (&v).MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n9 + i += n10 } } return i, nil @@ -1338,6 +1400,20 @@ func (m *OpMetadata) Size() (n int) { n += mapEntrySize + 1 + sovOps(uint64(mapEntrySize)) } } + if m.WorkerConstraint != nil { + l = m.WorkerConstraint.Size() + n += 1 + l + sovOps(uint64(l)) + } + return n +} + +func (m *WorkerConstraint) Size() (n int) { + var l int + _ = l + l = len(m.WorkerType) + if l > 0 { + n += 1 + l + sovOps(uint64(l)) + } return n } @@ -3075,6 +3151,118 @@ func (m *OpMetadata) Unmarshal(dAtA []byte) error { m.Description[mapkey] = mapvalue } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkerConstraint", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthOps + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.WorkerConstraint == nil { + m.WorkerConstraint = &WorkerConstraint{} + } + if err := m.WorkerConstraint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipOps(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthOps + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WorkerConstraint) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WorkerConstraint: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WorkerConstraint: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkerType", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthOps + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WorkerType = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipOps(dAtA[iNdEx:]) @@ -3404,54 +3592,58 @@ var ( func init() { proto.RegisterFile("ops.proto", fileDescriptorOps) } var fileDescriptorOps = []byte{ - // 783 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0x36, 0xa9, 0x1f, 0x4b, 0x43, 0xb7, 0x30, 0xb6, 0x45, 0x2b, 0x08, 0x86, 0xac, 0xb2, 0x45, - 0xa1, 0xb6, 0x36, 0x05, 0xa8, 0x40, 0x61, 0xf4, 0x60, 0xc0, 0xb2, 0x0d, 0x54, 0x2d, 0x0c, 0x01, - 0xdb, 0x07, 0x28, 0x28, 0x72, 0x25, 0x13, 0x95, 0xb8, 0x04, 0xb9, 0x74, 0xad, 0x4b, 0x0f, 0x7d, - 0x82, 0x02, 0x79, 0x8b, 0x1c, 0x73, 0x4f, 0xce, 0x3e, 0xe6, 0x9a, 0x1c, 0x9c, 0xc0, 0x79, 0x91, - 0x60, 0x66, 0x57, 0x22, 0x9d, 0x38, 0x80, 0x8d, 0xe4, 0xa4, 0xdd, 0x99, 0x6f, 0xbf, 0xfd, 0xe6, - 0x9b, 0x59, 0x0a, 0x9a, 0x32, 0xc9, 0xbc, 0x24, 0x95, 0x4a, 0x32, 0x3b, 0x99, 0xb4, 0xf7, 0x67, - 0x91, 0x3a, 0xcf, 0x27, 0x5e, 0x20, 0x17, 0xfd, 0x99, 0x9c, 0xc9, 0x3e, 0xa5, 0x26, 0xf9, 0x94, - 0x76, 0xb4, 0xa1, 0x95, 0x3e, 0xe2, 0x3e, 0xb3, 0xc0, 0x1e, 0x27, 0xec, 0x1b, 0xa8, 0x47, 0x71, - 0x92, 0xab, 0xac, 0x65, 0x75, 0x2b, 0x3d, 0x67, 0xd0, 0xf4, 0x92, 0x89, 0x37, 0xc2, 0x08, 0x37, - 0x09, 0xd6, 0x85, 0xaa, 0xb8, 0x14, 0x41, 0xcb, 0xee, 0x5a, 0x3d, 0x67, 0x00, 0x08, 0x38, 0xbd, - 0x14, 0xc1, 0x38, 0xf9, 0x6d, 0x83, 0x53, 0x86, 0x7d, 0x0f, 0xf5, 0x4c, 0xe6, 0x69, 0x20, 0x5a, - 0x15, 0xc2, 0x6c, 0x21, 0xe6, 0x4f, 0x8a, 0x10, 0xca, 0x64, 0x91, 0x29, 0x90, 0xc9, 0xb2, 0x55, - 0x2d, 0x98, 0x8e, 0x65, 0xb2, 0xd4, 0x4c, 0x98, 0x61, 0xdf, 0x42, 0x6d, 0x92, 0x47, 0xf3, 0xb0, - 0x55, 0x23, 0x88, 0x83, 0x90, 0x21, 0x06, 0x08, 0xa3, 0x73, 0xc3, 0x2a, 0xd8, 0x32, 0x71, 0xff, - 0x85, 0x1a, 0xe9, 0x64, 0xbf, 0x43, 0x3d, 0x8c, 0x66, 0x22, 0x53, 0x2d, 0xab, 0x6b, 0xf5, 0x9a, - 0xc3, 0xc1, 0xd5, 0xf5, 0xee, 0xc6, 0xcb, 0xeb, 0xdd, 0x1f, 0x4b, 0x86, 0xc8, 0x44, 0xc4, 0x81, - 0x8c, 0x95, 0x1f, 0xc5, 0x22, 0xcd, 0xfa, 0x33, 0xb9, 0xaf, 0x8f, 0x78, 0x27, 0xf4, 0xc3, 0x0d, - 0x03, 0xfb, 0x01, 0x6a, 0x51, 0x1c, 0x8a, 0x4b, 0x2a, 0xb6, 0x32, 0xfc, 0xc2, 0x50, 0x39, 0xe3, - 0x5c, 0x25, 0xb9, 0x1a, 0x61, 0x8a, 0x6b, 0x84, 0x3b, 0x82, 0xba, 0xb6, 0x81, 0xed, 0x40, 0x75, - 0x21, 0x94, 0x4f, 0xd7, 0x3b, 0x83, 0x06, 0x6a, 0x3e, 0x13, 0xca, 0xe7, 0x14, 0x45, 0x87, 0x17, - 0x32, 0x8f, 0x55, 0xd6, 0xb2, 0x0b, 0x87, 0xcf, 0x30, 0xc2, 0x4d, 0xc2, 0x3d, 0x84, 0x2a, 0x1e, - 0x60, 0x0c, 0xaa, 0x7e, 0x3a, 0xd3, 0xad, 0x68, 0x72, 0x5a, 0xb3, 0x6d, 0xa8, 0x88, 0xf8, 0x82, - 0xce, 0x36, 0x39, 0x2e, 0x31, 0x12, 0xfc, 0x13, 0x92, 0xd5, 0x4d, 0x8e, 0x4b, 0xf7, 0xb1, 0x05, - 0x35, 0x62, 0x64, 0x3d, 0xd4, 0x9f, 0xe4, 0xda, 0x8a, 0xca, 0x90, 0x19, 0xfd, 0x40, 0x4e, 0xad, - 0xe5, 0xa3, 0x6b, 0x6d, 0x68, 0x64, 0x62, 0x2e, 0x02, 0x25, 0x53, 0x2a, 0xb6, 0xc9, 0xd7, 0x7b, - 0xd4, 0x11, 0xa2, 0x9f, 0xfa, 0x0a, 0x5a, 0xb3, 0x9f, 0xa0, 0x2e, 0xc9, 0x04, 0xea, 0xde, 0x07, - 0xac, 0x31, 0x10, 0x24, 0x4f, 0x85, 0x1f, 0xca, 0x78, 0xbe, 0xa4, 0x4e, 0x36, 0xf8, 0x7a, 0xef, - 0x1e, 0x42, 0x5d, 0x37, 0x9d, 0x75, 0xa1, 0x92, 0xa5, 0x81, 0x19, 0xbc, 0xcf, 0x57, 0xd3, 0xa0, - 0xe7, 0x86, 0x63, 0x6a, 0x2d, 0xc4, 0x2e, 0x84, 0xb8, 0x1c, 0xa0, 0x80, 0x7d, 0x9a, 0x82, 0xdd, - 0x47, 0x16, 0x34, 0x56, 0xf3, 0xca, 0x3a, 0x00, 0x51, 0x28, 0x62, 0x15, 0x4d, 0x23, 0x91, 0xea, - 0x99, 0xe2, 0xa5, 0x08, 0xdb, 0x87, 0x9a, 0xaf, 0x54, 0xba, 0xea, 0xe7, 0xd7, 0xe5, 0x61, 0xf7, - 0x8e, 0x30, 0x73, 0x1a, 0xab, 0x74, 0xc9, 0x35, 0xaa, 0x7d, 0x00, 0x50, 0x04, 0xb1, 0x79, 0x7f, - 0x8b, 0xa5, 0x61, 0xc5, 0x25, 0xfb, 0x12, 0x6a, 0x17, 0xfe, 0x3c, 0x17, 0x46, 0x94, 0xde, 0xfc, - 0x6a, 0x1f, 0x58, 0xee, 0x53, 0x1b, 0x36, 0xcd, 0xf0, 0xb3, 0x3d, 0xd8, 0xa4, 0xe1, 0x37, 0x8a, - 0xee, 0xae, 0x74, 0x05, 0x61, 0xfd, 0xf5, 0xab, 0x2e, 0x69, 0x34, 0x54, 0xfa, 0x75, 0x1b, 0x8d, - 0xc5, 0x1b, 0xaf, 0x84, 0x62, 0x6a, 0x9e, 0x2f, 0xb5, 0xe2, 0x44, 0x4c, 0xa3, 0x38, 0x52, 0x91, - 0x8c, 0x39, 0xa6, 0xd8, 0xde, 0xaa, 0xea, 0x2a, 0x31, 0x7e, 0x55, 0x66, 0x7c, 0xbf, 0xe8, 0x11, - 0x38, 0xa5, 0x6b, 0xee, 0xa8, 0xfa, 0xbb, 0x72, 0xd5, 0xe6, 0x4a, 0xa2, 0xd3, 0xdf, 0x9e, 0xc2, - 0x85, 0x8f, 0xf0, 0xef, 0x17, 0x80, 0x82, 0xf2, 0xfe, 0x93, 0xe2, 0x3e, 0xb1, 0x00, 0xc6, 0x09, - 0xbe, 0xc8, 0xd0, 0xa7, 0x07, 0xbc, 0x15, 0xcd, 0x62, 0x99, 0x8a, 0xbf, 0x02, 0x3f, 0x38, 0x17, - 0x74, 0xbe, 0xc1, 0x1d, 0x1d, 0x3b, 0xc6, 0x10, 0x3b, 0x02, 0x27, 0x14, 0x59, 0x90, 0x46, 0x09, - 0x1a, 0x66, 0x4c, 0xdf, 0xc5, 0x9a, 0x0a, 0x1e, 0xef, 0xa4, 0x40, 0x68, 0xaf, 0xca, 0x67, 0xda, - 0x87, 0xb0, 0xfd, 0x2e, 0xe0, 0x41, 0xc5, 0xbe, 0xb0, 0x00, 0x8a, 0x9e, 0xe1, 0x51, 0x6c, 0x28, - 0xbe, 0xad, 0x2d, 0xdd, 0xc0, 0x39, 0x34, 0x16, 0x46, 0x8a, 0x11, 0xb8, 0x73, 0xbb, 0xcf, 0xde, - 0x4a, 0x29, 0x5d, 0xae, 0x3f, 0xa3, 0xff, 0xbd, 0x7a, 0xd0, 0x67, 0x74, 0x7d, 0x43, 0xfb, 0x0f, - 0xf8, 0xec, 0x16, 0xdd, 0x3d, 0x47, 0xa0, 0xb0, 0xab, 0x54, 0xdb, 0x70, 0xfb, 0xea, 0xa6, 0x63, - 0x3d, 0xbf, 0xe9, 0x58, 0xaf, 0x6f, 0x3a, 0xd6, 0xff, 0x6f, 0x3a, 0x1b, 0x93, 0x3a, 0xfd, 0x89, - 0xfd, 0xfc, 0x36, 0x00, 0x00, 0xff, 0xff, 0x3c, 0xbb, 0xc0, 0x75, 0x04, 0x07, 0x00, 0x00, + // 837 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x8e, 0xe3, 0x44, + 0x10, 0x1e, 0x3b, 0x3f, 0x9b, 0x94, 0x07, 0x14, 0x9a, 0x15, 0x44, 0xd1, 0x2a, 0x09, 0x06, 0xa1, + 0x00, 0x3b, 0x8e, 0x94, 0x95, 0xd0, 0x8a, 0xc3, 0x48, 0x93, 0x99, 0x95, 0x08, 0x68, 0x15, 0xa9, + 0x41, 0xe2, 0xb8, 0x72, 0xec, 0x4e, 0xd6, 0xda, 0xc4, 0xdd, 0xb2, 0xdb, 0x3b, 0x93, 0x0b, 0x07, + 0x9e, 0x00, 0x89, 0xb7, 0xe0, 0x1d, 0xe0, 0xbc, 0x47, 0xae, 0x70, 0x58, 0xd0, 0xf0, 0x0c, 0xdc, + 0x51, 0x55, 0x77, 0x62, 0xcf, 0x30, 0x48, 0x33, 0x82, 0x53, 0xba, 0xab, 0xaa, 0xbf, 0xaa, 0xfa, + 0xea, 0x2b, 0x07, 0xda, 0x52, 0xe5, 0x81, 0xca, 0xa4, 0x96, 0xcc, 0x55, 0x8b, 0xde, 0xd1, 0x2a, + 0xd1, 0xcf, 0x8b, 0x45, 0x10, 0xc9, 0xcd, 0x78, 0x25, 0x57, 0x72, 0x4c, 0xae, 0x45, 0xb1, 0xa4, + 0x1b, 0x5d, 0xe8, 0x64, 0x9e, 0xf8, 0x3f, 0x3b, 0xe0, 0xce, 0x15, 0x7b, 0x0f, 0x9a, 0x49, 0xaa, + 0x0a, 0x9d, 0x77, 0x9d, 0x61, 0x6d, 0xe4, 0x4d, 0xda, 0x81, 0x5a, 0x04, 0x33, 0xb4, 0x70, 0xeb, + 0x60, 0x43, 0xa8, 0x8b, 0x0b, 0x11, 0x75, 0xdd, 0xa1, 0x33, 0xf2, 0x26, 0x80, 0x01, 0x4f, 0x2e, + 0x44, 0x34, 0x57, 0x9f, 0x1f, 0x70, 0xf2, 0xb0, 0x0f, 0xa1, 0x99, 0xcb, 0x22, 0x8b, 0x44, 0xb7, + 0x46, 0x31, 0x87, 0x18, 0xf3, 0x15, 0x59, 0x28, 0xca, 0x7a, 0x11, 0x29, 0x92, 0x6a, 0xdb, 0xad, + 0x97, 0x48, 0xa7, 0x52, 0x6d, 0x0d, 0x12, 0x7a, 0xd8, 0xfb, 0xd0, 0x58, 0x14, 0xc9, 0x3a, 0xee, + 0x36, 0x28, 0xc4, 0xc3, 0x90, 0x29, 0x1a, 0x28, 0xc6, 0xf8, 0xa6, 0x75, 0x70, 0xa5, 0xf2, 0xbf, + 0x85, 0x06, 0xd5, 0xc9, 0xbe, 0x80, 0x66, 0x9c, 0xac, 0x44, 0xae, 0xbb, 0xce, 0xd0, 0x19, 0xb5, + 0xa7, 0x93, 0x57, 0xaf, 0x07, 0x07, 0xbf, 0xbd, 0x1e, 0x7c, 0x5c, 0x21, 0x44, 0x2a, 0x91, 0x46, + 0x32, 0xd5, 0x61, 0x92, 0x8a, 0x2c, 0x1f, 0xaf, 0xe4, 0x91, 0x79, 0x12, 0x9c, 0xd1, 0x0f, 0xb7, + 0x08, 0xec, 0x23, 0x68, 0x24, 0x69, 0x2c, 0x2e, 0xa8, 0xd9, 0xda, 0xf4, 0x6d, 0x0b, 0xe5, 0xcd, + 0x0b, 0xad, 0x0a, 0x3d, 0x43, 0x17, 0x37, 0x11, 0xfe, 0x0c, 0x9a, 0x86, 0x06, 0xf6, 0x00, 0xea, + 0x1b, 0xa1, 0x43, 0x4a, 0xef, 0x4d, 0x5a, 0x58, 0xf3, 0x53, 0xa1, 0x43, 0x4e, 0x56, 0x64, 0x78, + 0x23, 0x8b, 0x54, 0xe7, 0x5d, 0xb7, 0x64, 0xf8, 0x29, 0x5a, 0xb8, 0x75, 0xf8, 0xc7, 0x50, 0xc7, + 0x07, 0x8c, 0x41, 0x3d, 0xcc, 0x56, 0x66, 0x14, 0x6d, 0x4e, 0x67, 0xd6, 0x81, 0x9a, 0x48, 0x5f, + 0xd2, 0xdb, 0x36, 0xc7, 0x23, 0x5a, 0xa2, 0xf3, 0x98, 0xa8, 0x6e, 0x73, 0x3c, 0xfa, 0x3f, 0x3a, + 0xd0, 0x20, 0x44, 0x36, 0xc2, 0xfa, 0x55, 0x61, 0xa8, 0xa8, 0x4d, 0x99, 0xad, 0x1f, 0x88, 0xa9, + 0x7d, 0xf9, 0xc8, 0x5a, 0x0f, 0x5a, 0xb9, 0x58, 0x8b, 0x48, 0xcb, 0x8c, 0x9a, 0x6d, 0xf3, 0xfd, + 0x1d, 0xeb, 0x88, 0x91, 0x4f, 0x93, 0x82, 0xce, 0xec, 0x13, 0x68, 0x4a, 0x22, 0x81, 0xa6, 0xf7, + 0x2f, 0xd4, 0xd8, 0x10, 0x04, 0xcf, 0x44, 0x18, 0xcb, 0x74, 0xbd, 0xa5, 0x49, 0xb6, 0xf8, 0xfe, + 0xee, 0x1f, 0x43, 0xd3, 0x0c, 0x9d, 0x0d, 0xa1, 0x96, 0x67, 0x91, 0x15, 0xde, 0x9b, 0x3b, 0x35, + 0x18, 0xdd, 0x70, 0x74, 0xed, 0x0b, 0x71, 0xcb, 0x42, 0x7c, 0x0e, 0x50, 0x86, 0xfd, 0x3f, 0x0d, + 0xfb, 0x3f, 0x38, 0xd0, 0xda, 0xe9, 0x95, 0xf5, 0x01, 0x92, 0x58, 0xa4, 0x3a, 0x59, 0x26, 0x22, + 0x33, 0x9a, 0xe2, 0x15, 0x0b, 0x3b, 0x82, 0x46, 0xa8, 0x75, 0xb6, 0x9b, 0xe7, 0xbb, 0x55, 0xb1, + 0x07, 0x27, 0xe8, 0x79, 0x92, 0xea, 0x6c, 0xcb, 0x4d, 0x54, 0xef, 0x31, 0x40, 0x69, 0xc4, 0xe1, + 0xbd, 0x10, 0x5b, 0x8b, 0x8a, 0x47, 0x76, 0x1f, 0x1a, 0x2f, 0xc3, 0x75, 0x21, 0x6c, 0x51, 0xe6, + 0xf2, 0x99, 0xfb, 0xd8, 0xf1, 0x7f, 0x72, 0xe1, 0x9e, 0x15, 0x3f, 0x7b, 0x08, 0xf7, 0x48, 0xfc, + 0xb6, 0xa2, 0x9b, 0x3b, 0xdd, 0x85, 0xb0, 0xf1, 0x7e, 0xab, 0x2b, 0x35, 0x5a, 0x28, 0xb3, 0xdd, + 0xb6, 0xc6, 0x72, 0xc7, 0x6b, 0xb1, 0x58, 0xda, 0xf5, 0xa5, 0x51, 0x9c, 0x89, 0x65, 0x92, 0x26, + 0x3a, 0x91, 0x29, 0x47, 0x17, 0x7b, 0xb8, 0xeb, 0xba, 0x4e, 0x88, 0xef, 0x54, 0x11, 0xff, 0xd9, + 0xf4, 0x0c, 0xbc, 0x4a, 0x9a, 0x1b, 0xba, 0xfe, 0xa0, 0xda, 0xb5, 0x4d, 0x49, 0x70, 0xe6, 0xdb, + 0x53, 0xb2, 0xf0, 0x1f, 0xf8, 0xfb, 0x14, 0xa0, 0x84, 0xbc, 0xbd, 0x52, 0xfc, 0xbf, 0x1c, 0x80, + 0xb9, 0xc2, 0x8d, 0x8c, 0x43, 0x5a, 0xe0, 0xc3, 0x64, 0x95, 0xca, 0x4c, 0x3c, 0x8b, 0xc2, 0xe8, + 0xb9, 0xa0, 0xf7, 0x2d, 0xee, 0x19, 0xdb, 0x29, 0x9a, 0xd8, 0x09, 0x78, 0xb1, 0xc8, 0xa3, 0x2c, + 0x51, 0x48, 0x98, 0x25, 0x7d, 0x80, 0x3d, 0x95, 0x38, 0xc1, 0x59, 0x19, 0x61, 0xb8, 0xaa, 0xbe, + 0x61, 0x27, 0xf0, 0xd6, 0xb9, 0xcc, 0x5e, 0x88, 0xec, 0x59, 0x24, 0xd3, 0x5c, 0x67, 0x61, 0x92, + 0x6a, 0x3b, 0x8f, 0xfb, 0x08, 0xf4, 0x0d, 0x39, 0x4f, 0xf7, 0x3e, 0xde, 0x39, 0xbf, 0x66, 0xe9, + 0x1d, 0x43, 0xe7, 0x7a, 0x8e, 0x3b, 0xf1, 0xf5, 0x08, 0x3a, 0xd7, 0xb3, 0xb0, 0x01, 0x78, 0xb6, + 0x2c, 0xbd, 0x55, 0x62, 0xb7, 0x0d, 0xc6, 0xf4, 0xf5, 0x56, 0x09, 0xff, 0x57, 0x07, 0xa0, 0xd4, + 0x0a, 0xe6, 0x43, 0x21, 0xe1, 0x4e, 0x1f, 0x1a, 0xe1, 0xac, 0xa1, 0xb5, 0xb1, 0x14, 0x58, 0x62, + 0x1e, 0x5c, 0xd5, 0x57, 0xb0, 0x63, 0x88, 0x2a, 0x36, 0x9f, 0xef, 0xef, 0x7e, 0xbf, 0xd3, 0xe7, + 0x7b, 0x9f, 0xa1, 0xf7, 0x25, 0xbc, 0x71, 0x05, 0xee, 0x96, 0xd2, 0x2b, 0xc7, 0x54, 0x21, 0x64, + 0xda, 0x79, 0x75, 0xd9, 0x77, 0x7e, 0xb9, 0xec, 0x3b, 0x7f, 0x5c, 0xf6, 0x9d, 0xef, 0xff, 0xec, + 0x1f, 0x2c, 0x9a, 0xf4, 0xe7, 0xf9, 0xe8, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x43, 0xb9, + 0x14, 0x7c, 0x07, 0x00, 0x00, } diff --git a/solver/pb/ops.proto b/solver/pb/ops.proto index ada3b65a60803..3b2bc6bfc9058 100644 --- a/solver/pb/ops.proto +++ b/solver/pb/ops.proto @@ -74,6 +74,13 @@ message OpMetadata { // Description can be used for keeping any text fields that builder doesn't parse map description = 2; // TODO: add worker constraint, etc. + WorkerConstraint worker_constraint = 3; +} + +// WorkerConstraint is WIP. Will be changed. +message WorkerConstraint { + string worker_type = 1; // e.g. "runc", "containerd"...? also we will have worker_brand, arch, os, labels... +// TODO: distributed mode } // Definition is the LLB definition structure with per-vertex metadata entries diff --git a/solver/solver.go b/solver/solver.go index 6bec60a80d7d1..8aadf6f975615 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -11,10 +11,10 @@ import ( "github.com/moby/buildkit/cache/cacheimport" "github.com/moby/buildkit/cache/contenthash" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/solver/pb" - "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/bgfunc" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/worker" @@ -25,31 +25,32 @@ import ( "golang.org/x/sync/errgroup" ) -type LLBOpt struct { - SourceManager *source.Manager - CacheManager cache.Manager - Worker worker.Worker - InstructionCache InstructionCache - ImageSource source.Source - Frontends map[string]frontend.Frontend // used by nested invocations - CacheExporter *cacheimport.CacheExporter - CacheImporter *cacheimport.CacheImporter +// DetermineVertexWorker determines worker for a vertex. +// Currently, constraint is just ignored. +// Also we need to track the workers of the inputs. +func DetermineVertexWorker(wc *worker.Controller, v Vertex) (*worker.Worker, error) { + // TODO: multiworker + return wc.GetDefault() } -func NewLLBSolver(opt LLBOpt) *Solver { +func NewLLBSolver(wc *worker.Controller, frontends map[string]frontend.Frontend) *Solver { var s *Solver s = New(func(v Vertex) (Op, error) { + w, err := DetermineVertexWorker(wc, v) + if err != nil { + return nil, err + } switch op := v.Sys().(type) { case *pb.Op_Source: - return newSourceOp(v, op, opt.SourceManager) + return newSourceOp(v, op, w.SourceManager) case *pb.Op_Exec: - return newExecOp(v, op, opt.CacheManager, opt.Worker) + return newExecOp(v, op, w.CacheManager, w.Executor) case *pb.Op_Build: return newBuildOp(v, op, s) default: return nil, nil } - }, opt.InstructionCache, opt.ImageSource, opt.Worker, opt.CacheManager, opt.Frontends, opt.CacheExporter, opt.CacheImporter) + }, wc, frontends) return s } @@ -84,19 +85,14 @@ type InstructionCache interface { } type Solver struct { - resolve ResolveOpFunc - jobs *jobList - cache InstructionCache - imageSource source.Source - worker worker.Worker - cm cache.Manager // TODO: remove with immutableRef.New() - frontends map[string]frontend.Frontend - ce *cacheimport.CacheExporter - ci *cacheimport.CacheImporter + resolve ResolveOpFunc + jobs *jobList + workerController *worker.Controller + frontends map[string]frontend.Frontend } -func New(resolve ResolveOpFunc, cache InstructionCache, imageSource source.Source, worker worker.Worker, cm cache.Manager, f map[string]frontend.Frontend, ce *cacheimport.CacheExporter, ci *cacheimport.CacheImporter) *Solver { - return &Solver{resolve: resolve, jobs: newJobList(), cache: cache, imageSource: imageSource, worker: worker, cm: cm, frontends: f, ce: ce, ci: ci} +func New(resolve ResolveOpFunc, wc *worker.Controller, f map[string]frontend.Frontend) *Solver { + return &Solver{resolve: resolve, jobs: newJobList(), workerController: wc, frontends: f} } type SolveRequest struct { @@ -125,7 +121,12 @@ func (s *Solver) solve(ctx context.Context, j *job, req SolveRequest) (Reference } func (s *Solver) llbBridge(j *job) *llbBridge { - return &llbBridge{job: j, Solver: s, resolveImageConfig: s.imageSource.(resolveImageConfig)} + // FIXME(AkihiroSuda): make sure worker implements interfaces required by llbBridge + worker, err := s.workerController.GetDefault() + if err != nil { + panic(err) + } + return &llbBridge{job: j, Solver: s, worker: worker} } func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error { @@ -135,16 +136,21 @@ func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error { pr, ctx, closeProgressWriter := progress.NewContext(ctx) defer closeProgressWriter() + // TODO: multiworker + defaultWorker, err := s.workerController.GetDefault() + if err != nil { + return err + } if importRef := req.ImportCacheRef; importRef != "" { - cache, err := s.ci.Import(ctx, importRef) + cache, err := defaultWorker.CacheImporter.Import(ctx, importRef) if err != nil { return err } - s.cache = mergeRemoteCache(s.cache, cache) + defaultWorker.InstructionCache = mergeRemoteCache(defaultWorker.InstructionCache, cache) } // register a build job. vertex needs to be loaded to a job to run - ctx, j, err := s.jobs.new(ctx, id, pr, s.cache) + ctx, j, err := s.jobs.new(ctx, id, pr, defaultWorker.InstructionCache) if err != nil { return err } @@ -193,7 +199,8 @@ func (s *Solver) Solve(ctx context.Context, id string, req SolveRequest) error { return err } - return s.ce.Export(ctx, records, exportName) + // TODO: multiworker + return defaultWorker.CacheExporter.Export(ctx, records, exportName) }); err != nil { return err } @@ -232,7 +239,11 @@ func (s *Solver) subBuild(ctx context.Context, dgst digest.Digest, req SolveRequ st = jl.actives[inp.Vertex.Digest()] jl.mu.Unlock() - return getRef(ctx, st.solver, inp.Vertex.(*vertex), inp.Index, s.cache) // TODO: combine to pass single input // TODO: export cache for subbuilds + w, err := DetermineVertexWorker(s.workerController, inp.Vertex) + if err != nil { + return nil, err + } + return getRef(ctx, st.solver, inp.Vertex.(*vertex), inp.Index, w.InstructionCache) // TODO: combine to pass single input // TODO: export cache for subbuilds } type VertexSolver interface { @@ -779,7 +790,8 @@ type VertexResult struct { type llbBridge struct { *Solver job *job - resolveImageConfig + // this worker is used for running containerized frontend, not vertices + worker *worker.Worker } type resolveImageConfig interface { @@ -814,13 +826,22 @@ func (s *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (cache return immutable, exp, nil } -func (s *llbBridge) Exec(ctx context.Context, meta worker.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { - active, err := s.cm.New(ctx, rootFS) +func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { + // ImageSource is typically source/containerimage + resolveImageConfig, ok := s.worker.ImageSource.(resolveImageConfig) + if !ok { + return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", s.worker.Name) + } + return resolveImageConfig.ResolveImageConfig(ctx, ref) +} + +func (s *llbBridge) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { + active, err := s.worker.CacheManager.New(ctx, rootFS) if err != nil { return err } defer active.Release(context.TODO()) - return s.worker.Exec(ctx, meta, active, nil, stdin, stdout, stderr) + return s.worker.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr) } func cacheKeyForIndex(dgst digest.Digest, index Index) digest.Digest { diff --git a/solver/vertex.go b/solver/vertex.go index fcd0a73b11dda..71cebc8fe0992 100644 --- a/solver/vertex.go +++ b/solver/vertex.go @@ -6,6 +6,7 @@ import ( "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" "golang.org/x/net/context" @@ -18,17 +19,13 @@ type Vertex interface { // Sys returns an internal value that is used to execute the vertex. Usually // this is capured by the operation resolver method during solve. Sys() interface{} - Metadata() Metadata + // FIXME(AkihiroSuda): we should not import pb pkg here. + Metadata() *pb.OpMetadata // Array of vertexes current vertex depends on. Inputs() []Input Name() string // change this to general metadata } -// Metadata is per-vertex metadata, implemented by *pb.OpMetadata -type Metadata interface { - GetIgnoreCache() bool -} - type Index int // Input is an pointer to a single reference from a vertex by an index. @@ -45,7 +42,7 @@ type input struct { type vertex struct { mu sync.Mutex sys interface{} - metadata Metadata + metadata *pb.OpMetadata inputs []*input err error digest digest.Digest @@ -74,7 +71,7 @@ func (v *vertex) Sys() interface{} { return v.sys } -func (v *vertex) Metadata() Metadata { +func (v *vertex) Metadata() *pb.OpMetadata { return v.metadata } diff --git a/util/testutil/integration/containerd.go b/util/testutil/integration/containerd.go index c5980c134b756..5941fc214e451 100644 --- a/util/testutil/integration/containerd.go +++ b/util/testutil/integration/containerd.go @@ -64,7 +64,7 @@ func (c *containerd) New() (sb Sandbox, cl func() error, err error) { return nil, nil, err } - builddSock, stop, err := runBuildd([]string{"buildd-containerd", "--containerd", address}, logs) + builddSock, stop, err := runBuildd([]string{"buildd-containerd", "--containerd-worker-addr", address}, logs) if err != nil { return nil, nil, err } diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go new file mode 100644 index 0000000000000..4abc156120713 --- /dev/null +++ b/worker/containerd/containerd.go @@ -0,0 +1,83 @@ +package containerd + +import ( + "context" + "os" + "path/filepath" + "strings" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/content" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/executor/containerdexecutor" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +// NewWorkerOpt creates a WorkerOpt. +// But it does not set the following fields: +// - SessionManager +func NewWorkerOpt(root string, address, snapshotterName string, opts ...containerd.ClientOpt) (worker.WorkerOpt, error) { + // TODO: take lock to make sure there are no duplicates + opts = append([]containerd.ClientOpt{containerd.WithDefaultNamespace("buildkit")}, opts...) + client, err := containerd.New(address, opts...) + if err != nil { + return worker.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address) + } + return newContainerd(root, client, snapshotterName) +} + +func newContainerd(root string, client *containerd.Client, snapshotterName string) (worker.WorkerOpt, error) { + if strings.Contains(snapshotterName, "/") { + return worker.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName) + } + name := "containerd-" + snapshotterName + root = filepath.Join(name, snapshotterName) + if err := os.MkdirAll(root, 0700); err != nil { + return worker.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root) + } + + md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) + if err != nil { + return worker.WorkerOpt{}, err + } + df := client.DiffService() + opt := worker.WorkerOpt{ + Name: name, + MetadataStore: md, + Executor: containerdexecutor.New(client, root), + BaseSnapshotter: client.SnapshotService(snapshotterName), + ContentStore: &noGCContentStore{client.ContentStore()}, + Applier: df, + Differ: df, + ImageStore: client.ImageService(), + } + return opt, nil +} + +// TODO: Replace this with leases + +type noGCContentStore struct { + content.Store +} +type noGCWriter struct { + content.Writer +} + +func (cs *noGCContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, ref, size, expected) + return &noGCWriter{w}, err +} + +func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + opts = append(opts, func(info *content.Info) error { + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) + return nil + }) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/control/control_standalone.go b/worker/runc/runc.go similarity index 80% rename from control/control_standalone.go rename to worker/runc/runc.go index ea5983d94639a..ab84de8ac4fb9 100644 --- a/control/control_standalone.go +++ b/worker/runc/runc.go @@ -1,6 +1,4 @@ -// +build standalone - -package control +package runc import ( "context" @@ -11,81 +9,76 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/diff/walking" - "github.com/containerd/containerd/metadata" + ctdmetadata "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/overlay" - "github.com/moby/buildkit/worker/runcworker" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/executor/runcexecutor" + "github.com/moby/buildkit/worker" + "github.com/opencontainers/go-digest" ) -func NewStandalone(root string) (*Controller, error) { +// NewWorkerOpt creates a WorkerOpt. +// But it does not set the following fields: +// - SessionManager +func NewWorkerOpt(root string) (worker.WorkerOpt, error) { + var opt worker.WorkerOpt + name := "runc-overlay" + root = filepath.Join(root, name) if err := os.MkdirAll(root, 0700); err != nil { - return nil, errors.Wrapf(err, "failed to create %s", root) + return opt, err } - - // TODO: take lock to make sure there are no duplicates - - pd, err := newStandalonePullDeps(root) + md, err := metadata.NewStore(filepath.Join(root, name+"-metadata.db")) if err != nil { - return nil, err + return opt, err } - - opt, err := defaultControllerOpts(root, *pd) - if err != nil { - return nil, err - } - - w, err := runcworker.New(filepath.Join(root, "runc")) + exe, err := runcexecutor.New(filepath.Join(root, "executor")) if err != nil { - return nil, err + return opt, err } - - opt.Worker = w - - return NewController(*opt) -} - -func newStandalonePullDeps(root string) (*pullDeps, error) { s, err := overlay.NewSnapshotter(filepath.Join(root, "snapshots")) if err != nil { - return nil, err + return opt, err } c, err := local.NewStore(filepath.Join(root, "content")) if err != nil { - return nil, err + return opt, err } db, err := bolt.Open(filepath.Join(root, "containerdmeta.db"), 0644, nil) if err != nil { - return nil, err + return opt, err } - mdb := metadata.NewDB(db, c, map[string]ctdsnapshot.Snapshotter{ + mdb := ctdmetadata.NewDB(db, c, map[string]ctdsnapshot.Snapshotter{ "overlay": s, }) if err := mdb.Init(context.TODO()); err != nil { - return nil, err + return opt, err } c = &nsContent{mdb.ContentStore()} - df, err := walking.NewWalkingDiff(c) if err != nil { - return nil, err + return opt, err } // TODO: call mdb.GarbageCollect . maybe just inject it into nsSnapshotter.Remove and csContent.Delete - return &pullDeps{ - Snapshotter: &nsSnapshotter{mdb.Snapshotter("overlay")}, - ContentStore: c, - Applier: df, - Differ: df, - }, nil + opt = worker.WorkerOpt{ + Name: name, + MetadataStore: md, + Executor: exe, + BaseSnapshotter: &nsSnapshotter{mdb.Snapshotter("overlay")}, + ContentStore: c, + Applier: df, + Differ: df, + ImageStore: nil, // explicitly + } + return opt, nil } // this should be supported by containerd. currently packages are unusable without wrapping diff --git a/control/control_standalone_test.go b/worker/runc/runc_test.go similarity index 64% rename from control/control_standalone_test.go rename to worker/runc/runc_test.go index fa11d13378fa6..2105899c4e8f0 100644 --- a/control/control_standalone_test.go +++ b/worker/runc/runc_test.go @@ -1,6 +1,6 @@ // +build linux,standalone -package control +package runc import ( "bytes" @@ -12,20 +12,17 @@ import ( "testing" "github.com/containerd/containerd/namespaces" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/snapshot/blobmapping" "github.com/moby/buildkit/source" - "github.com/moby/buildkit/source/containerimage" "github.com/moby/buildkit/worker" - "github.com/moby/buildkit/worker/runcworker" "github.com/stretchr/testify/assert" "golang.org/x/net/context" ) -func TestControlStandalone(t *testing.T) { +func TestRuncWorker(t *testing.T) { t.Parallel() if os.Getuid() != 0 { t.Skip("requires root") @@ -37,46 +34,23 @@ func TestControlStandalone(t *testing.T) { ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") // this should be an example or e2e test - tmpdir, err := ioutil.TempDir("", "controltest") + tmpdir, err := ioutil.TempDir("", "workertest") assert.NoError(t, err) defer os.RemoveAll(tmpdir) - cd, err := newStandalonePullDeps(tmpdir) + workerOpt, err := NewWorkerOpt(tmpdir) assert.NoError(t, err) - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + workerOpt.SessionManager, err = session.NewManager() assert.NoError(t, err) - snapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ - Content: cd.ContentStore, - Snapshotter: cd.Snapshotter, - MetadataStore: md, - }) + w, err := worker.NewWorker(workerOpt) assert.NoError(t, err) - cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: snapshotter, - MetadataStore: md, - }) - assert.NoError(t, err) - - sm, err := source.NewManager() - assert.NoError(t, err) - - is, err := containerimage.NewSource(containerimage.SourceOpt{ - Snapshotter: snapshotter, - ContentStore: cd.ContentStore, - Applier: cd.Applier, - CacheAccessor: cm, - }) - assert.NoError(t, err) - - sm.Register(is) - img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") assert.NoError(t, err) - src, err := sm.Resolve(ctx, img) + src, err := w.SourceManager.Resolve(ctx, img) assert.NoError(t, err) snap, err := src.Snapshot(ctx) @@ -103,7 +77,7 @@ func TestControlStandalone(t *testing.T) { lm.Unmount() assert.NoError(t, err) - du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{}) + du, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{}) assert.NoError(t, err) // for _, d := range du { @@ -114,26 +88,23 @@ func TestControlStandalone(t *testing.T) { assert.True(t, d.Size >= 8192) } - w, err := runcworker.New(tmpdir) - assert.NoError(t, err) - - meta := worker.Meta{ + meta := executor.Meta{ Args: []string{"/bin/sh", "-c", "echo \"foo\" > /bar"}, Cwd: "/", } stderr := bytes.NewBuffer(nil) - err = w.Exec(ctx, meta, snap, nil, nil, nil, &nopCloser{stderr}) + err = w.Executor.Exec(ctx, meta, snap, nil, nil, nil, &nopCloser{stderr}) assert.Error(t, err) // Read-only root // typical error is like `mkdir /.../rootfs/proc: read-only file system`. // make sure the error is caused before running `echo foo > /bar`. assert.Contains(t, stderr.String(), "read-only file system") - root, err := cm.New(ctx, snap) + root, err := w.CacheManager.New(ctx, snap) assert.NoError(t, err) - err = w.Exec(ctx, meta, root, nil, nil, nil, nil) + err = w.Executor.Exec(ctx, meta, root, nil, nil, nil, nil) assert.NoError(t, err) rf, err := root.Commit(ctx) @@ -160,7 +131,7 @@ func TestControlStandalone(t *testing.T) { err = snap.Release(ctx) assert.NoError(t, err) - du2, err := cm.DiskUsage(ctx, client.DiskUsageInfo{}) + du2, err := w.CacheManager.DiskUsage(ctx, client.DiskUsageInfo{}) assert.NoError(t, err) assert.Equal(t, 1, len(du2)-len(du)) diff --git a/worker/worker.go b/worker/worker.go index 909a768396592..d7c540fbe2ee2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,29 +1,188 @@ package worker import ( - "io" + "golang.org/x/net/context" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/images" + ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/cache" - "golang.org/x/net/context" + "github.com/moby/buildkit/cache/cacheimport" + "github.com/moby/buildkit/cache/instructioncache" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/executor" + "github.com/moby/buildkit/exporter" + imageexporter "github.com/moby/buildkit/exporter/containerimage" + localexporter "github.com/moby/buildkit/exporter/local" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/snapshot/blobmapping" + "github.com/moby/buildkit/source" + "github.com/moby/buildkit/source/containerimage" + "github.com/moby/buildkit/source/git" + "github.com/moby/buildkit/source/http" + "github.com/moby/buildkit/source/local" + digest "github.com/opencontainers/go-digest" ) -type Meta struct { - Args []string - Env []string - User string - Cwd string - Tty bool - // DisableNetworking bool +// WorkerOpt is specific to a worker. +// See also CommonOpt. +type WorkerOpt struct { + Name string + SessionManager *session.Manager + MetadataStore *metadata.Store + Executor executor.Executor + BaseSnapshotter ctdsnapshot.Snapshotter // not blobmapping one (FIXME: just require blobmapping snapshotter?) + ContentStore content.Store + Applier diff.Differ + Differ diff.Differ + ImageStore images.Store } -type Mount struct { - Src cache.Mountable - Selector string - Dest string - Readonly bool +// Worker is a local worker instance with dedicated snapshotter, cache, and so on. +// TODO: s/Worker/OpWorker/g ? +// FIXME: Worker should be rather an interface +type Worker struct { + WorkerOpt + Snapshotter ctdsnapshot.Snapshotter // blobmapping snapshotter + CacheManager cache.Manager + SourceManager *source.Manager + InstructionCache InstructionCache + Exporters map[string]exporter.Exporter + ImageSource source.Source + CacheExporter *cacheimport.CacheExporter + CacheImporter *cacheimport.CacheImporter + // no frontend here } -type Worker interface { - // TODO: add stdout/err - Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error +// FIXME: deduplicate interface definitions? +type InstructionCache interface { + Probe(ctx context.Context, key digest.Digest) (bool, error) + Lookup(ctx context.Context, key digest.Digest, msg string) (interface{}, error) // TODO: regular ref + Set(key digest.Digest, ref interface{}) error + SetContentMapping(contentKey, key digest.Digest) error + GetContentMapping(dgst digest.Digest) ([]digest.Digest, error) +} + +// NewWorker instantiates a local worker +func NewWorker(opt WorkerOpt) (*Worker, error) { + bmSnapshotter, err := blobmapping.NewSnapshotter(blobmapping.Opt{ + Content: opt.ContentStore, + Snapshotter: opt.BaseSnapshotter, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + cm, err := cache.NewManager(cache.ManagerOpt{ + Snapshotter: bmSnapshotter, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + ic := &instructioncache.LocalStore{ + MetadataStore: opt.MetadataStore, + Cache: cm, + } + + sm, err := source.NewManager() + if err != nil { + return nil, err + } + + is, err := containerimage.NewSource(containerimage.SourceOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + SessionManager: opt.SessionManager, + Applier: opt.Applier, + CacheAccessor: cm, + }) + if err != nil { + return nil, err + } + + sm.Register(is) + + gs, err := git.NewSource(git.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(gs) + + hs, err := http.NewSource(http.Opt{ + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + + sm.Register(hs) + + ss, err := local.NewSource(local.Opt{ + SessionManager: opt.SessionManager, + CacheAccessor: cm, + MetadataStore: opt.MetadataStore, + }) + if err != nil { + return nil, err + } + sm.Register(ss) + + exporters := map[string]exporter.Exporter{} + + imageExporter, err := imageexporter.New(imageexporter.Opt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + Differ: opt.Differ, + Images: opt.ImageStore, + SessionManager: opt.SessionManager, + }) + if err != nil { + return nil, err + } + exporters[client.ExporterImage] = imageExporter + + localExporter, err := localexporter.New(localexporter.Opt{ + SessionManager: opt.SessionManager, + }) + if err != nil { + return nil, err + } + exporters[client.ExporterLocal] = localExporter + + ce := cacheimport.NewCacheExporter(cacheimport.ExporterOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + SessionManager: opt.SessionManager, + Differ: opt.Differ, + }) + + ci := cacheimport.NewCacheImporter(cacheimport.ImportOpt{ + Snapshotter: bmSnapshotter, + ContentStore: opt.ContentStore, + Applier: opt.Applier, + CacheAccessor: cm, + SessionManager: opt.SessionManager, + }) + + return &Worker{ + WorkerOpt: opt, + Snapshotter: bmSnapshotter, + CacheManager: cm, + SourceManager: sm, + InstructionCache: ic, + Exporters: exporters, + ImageSource: is, + CacheExporter: ce, + CacheImporter: ci, + }, nil } diff --git a/worker/workercontroller.go b/worker/workercontroller.go new file mode 100644 index 0000000000000..b05c854dc4afd --- /dev/null +++ b/worker/workercontroller.go @@ -0,0 +1,47 @@ +package worker + +import ( + "sync" + + "github.com/pkg/errors" +) + +// Controller holds worker instances. +// Currently, only local workers are supported. +type Controller struct { + mu sync.Mutex + // TODO: define worker interface and support remote ones + workers []*Worker +} + +// Add adds a local worker +func (c *Controller) Add(w *Worker) error { + c.mu.Lock() + c.workers = append(c.workers, w) + c.mu.Unlock() + return nil +} + +// GetAll returns all local workers +func (c *Controller) GetAll() []*Worker { + c.mu.Lock() + workers := c.workers + c.mu.Unlock() + return workers +} + +// GetDefault returns the default local worker +func (c *Controller) GetDefault() (*Worker, error) { + var w *Worker + c.mu.Lock() + if len(c.workers) > 0 { + w = c.workers[0] + } + c.mu.Unlock() + if w == nil { + return nil, errors.New("no worker registered") + } + return w, nil +} + +// TODO: add Get(Constraint) (*Worker, error)