Skip to content

Commit

Permalink
Refactor the optimizer based on containerd
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock committed Jan 18, 2021
1 parent 8f544b4 commit 9c2288a
Show file tree
Hide file tree
Showing 34 changed files with 2,220 additions and 3,287 deletions.
347 changes: 347 additions & 0 deletions analyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package analyzer

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/containerd/console"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/cmd/ctr/commands/tasks"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/stargz-snapshotter/analyzer/fanotify"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/rs/xid"
)

var defaultPeriod = 10 * time.Second

// Analyze analyzes the passed image then store the record of prioritized files into
// containerd's content store. This function returns the digest of that record file.
// This digest can be used to read record from the content store.
func Analyze(ctx context.Context, client *containerd.Client, ref string, opts ...Option) (digest.Digest, error) {
var aOpts analyzerOpts
for _, o := range opts {
o(&aOpts)
}
if aOpts.terminal && aOpts.waitOnSignal {
return "", fmt.Errorf("wait-on-signal option cannot be used with terminal option")
}

target, err := ioutil.TempDir("", "target")
if err != nil {
return "", err
}
defer os.RemoveAll(target)

cs := client.ContentStore()
is := client.ImageService()
ss := client.SnapshotService(aOpts.snapshotter)

img, err := is.Get(ctx, ref)
if err != nil {
return "", err
}
platformImg := containerd.NewImageWithPlatform(client, img, platforms.Default())

// Mount the target image
// NOTE: We cannot let containerd prepare the rootfs. We create mount namespace *before*
// creating the container so containerd's bundle preparation (mounting snapshots to
// the bundle directory, etc.) is invisible inside the pre-unshared mount namespace.
// This leads to runc using empty directory as the rootfs.
if unpacked, err := platformImg.IsUnpacked(ctx, aOpts.snapshotter); err != nil {
return "", err
} else if !unpacked {
if err := platformImg.Unpack(ctx, aOpts.snapshotter); err != nil {
return "", err
}
}
cleanup, err := mountImage(ctx, ss, platformImg, target)
if err != nil {
return "", err
}
defer cleanup()

// Spawn a fanotifier process in a new mount namespace and setup recorder.
fanotifier, err := fanotify.SpawnFanotifier("/proc/self/exe")
if err != nil {
return "", errors.Wrapf(err, "failed to spawn fanotifier")
}
defer func() {
if err := fanotifier.Close(); err != nil {
log.G(ctx).WithError(err).Warnf("failed to close fanotifier")
}
}()

// Prepare the spec based on the specified image and runtime options.
var sOpts []oci.SpecOpts
if aOpts.specOpts != nil {
gotOpts, done, err := aOpts.specOpts(platformImg, target)
if err != nil {
return "", err
}
defer func() {
if err := done(); err != nil {
log.G(ctx).WithError(err).Warnf("failed to cleanup container")
return
}
}()
sOpts = append(sOpts, gotOpts...)
} else {
sOpts = append(sOpts,
oci.WithDefaultSpec(),
oci.WithDefaultUnixDevices,
oci.WithRootFSPath(target),
oci.WithImageConfig(platformImg),
)
}
sOpts = append(sOpts, oci.WithLinuxNamespace(runtimespec.LinuxNamespace{
Type: runtimespec.MountNamespace,
Path: fanotifier.MountNamespacePath(), // use mount namespace that the fanotifier created
}))

// Create the container and the task
var container containerd.Container
for i := 0; i < 3; i++ {
id := xid.New().String()
var s runtimespec.Spec
container, err = client.NewContainer(ctx, id,
containerd.WithImage(platformImg),
containerd.WithSnapshotter(aOpts.snapshotter),
containerd.WithImageStopSignal(platformImg, "SIGKILL"),

// WithImageConfig depends on WithImage and WithSnapshotter for resolving
// username (accesses to /etc/{passwd,group} files on the rootfs)
containerd.WithSpec(&s, sOpts...),
)
if err != nil {
if errdefs.IsAlreadyExists(err) {
log.G(ctx).WithError(err).Warnf("failed to create container")
continue
}
return "", err
}
break
}
if container == nil {
return "", fmt.Errorf("failed to create container")
}
defer container.Delete(ctx, containerd.WithSnapshotCleanup)
var ioCreator cio.Creator
var con console.Console
if aOpts.terminal {
con = console.Current()
defer con.Reset()
if err := con.SetRaw(); err != nil {
return "", err
}
// On terminal mode, the "stderr" field is unused.
ioCreator = cio.NewCreator(cio.WithStreams(con, con, nil), cio.WithTerminal)
} else {
// TODO: we can support `-i` option (enabling stdin) in the future, if needed.
ioCreator = cio.NewCreator(cio.WithStreams(nil, os.Stdout, os.Stderr))
}
task, err := container.NewTask(ctx, ioCreator)
if err != nil {
return "", err
}

// Start to monitor "/" and run the task.
rc, err := newImageRecorder(ctx, cs, img, platforms.Default())
if err != nil {
return "", err
}
defer rc.close()
if err := fanotifier.Start(); err != nil {
return "", errors.Wrapf(err, "failed to start fanotifier")
}
var fanotifierClosed bool
var fanotifierClosedMu sync.Mutex
go func() {
for {
path, err := fanotifier.GetPath()
if err != nil {
if err == io.EOF {
fanotifierClosedMu.Lock()
isFanotifierClosed := fanotifierClosed
fanotifierClosedMu.Unlock()
if isFanotifierClosed {
break
}
}
log.G(ctx).WithError(err).Error("failed to get notified path")
break
}
if err := rc.record(path); err != nil {
log.G(ctx).WithError(err).Debugf("failed to record %q", path)
}
}
}()
if aOpts.terminal {
if err := tasks.HandleConsoleResize(ctx, task, con); err != nil {
log.G(ctx).WithError(err).Error("failed to resize console")
}
} else {
sigc := commands.ForwardAllSignals(ctx, task)
defer commands.StopCatch(sigc)
}
if err := task.Start(ctx); err != nil {
return "", err
}

// Wait until the task exit
var status containerd.ExitStatus
var killOk bool
if aOpts.waitOnSignal { // NOTE: not functional with `terminal` option
log.G(ctx).Infof("press Ctrl+C to terminate the container")
status, killOk, err = waitOnSignal(ctx, container, task)
if err != nil {
return "", err
}
} else {
if aOpts.period <= 0 {
aOpts.period = defaultPeriod
}
log.G(ctx).Infof("waiting for %v ...", aOpts.period)
status, killOk, err = waitOnTimeout(ctx, container, task, aOpts.period)
if err != nil {
return "", err
}
}
if !killOk {
log.G(ctx).Warnf("failed to exit task %v; manually kill it", task.ID())
} else {
code, _, err := status.Result()
if err != nil {
return "", err
}
log.G(ctx).Infof("container exit with code %v", code)
if _, err := task.Delete(ctx); err != nil {
return "", err
}
}

// ensure no record comes in
fanotifierClosedMu.Lock()
fanotifierClosed = true
fanotifierClosedMu.Unlock()
if err := fanotifier.Close(); err != nil {
log.G(ctx).WithError(err).Warnf("failed to cleanup fanotifier")
}

// Finish recording
return rc.commit(ctx)
}

func mountImage(ctx context.Context, ss snapshots.Snapshotter, image containerd.Image, mountpoint string) (func(), error) {
diffIDs, err := image.RootFS(ctx)
if err != nil {
return nil, err
}
mounts, err := ss.Prepare(ctx, mountpoint, identity.ChainID(diffIDs).String())
if err != nil {
return nil, err
}
if err := mount.All(mounts, mountpoint); err != nil {
if err := ss.Remove(ctx, mountpoint); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Warnf("failed to cleanup snapshot after mount error")
}
return nil, errors.Wrapf(err, "failed to mount rootfs at %q", mountpoint)
}
return func() {
if err := mount.UnmountAll(mountpoint, 0); err != nil {
log.G(ctx).WithError(err).Warnf("failed to unmount snapshot")
}
if err := ss.Remove(ctx, mountpoint); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Warnf("failed to cleanup snapshot")
}
}, nil
}

func waitOnSignal(ctx context.Context, container containerd.Container, task containerd.Task) (containerd.ExitStatus, bool, error) {
statusC, err := task.Wait(ctx)
if err != nil {
return containerd.ExitStatus{}, false, err
}
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Stop(sc)
select {
case status := <-statusC:
return status, true, nil
case <-sc:
log.G(ctx).Info("signal detected")
status, err := killTask(ctx, container, task, statusC)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to kill container")
return containerd.ExitStatus{}, false, nil
}
return status, true, nil
}
}

func waitOnTimeout(ctx context.Context, container containerd.Container, task containerd.Task, period time.Duration) (containerd.ExitStatus, bool, error) {
statusC, err := task.Wait(ctx)
if err != nil {
return containerd.ExitStatus{}, false, err
}
select {
case status := <-statusC:
return status, true, nil
case <-time.After(period):
status, err := killTask(ctx, container, task, statusC)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to kill container")
return containerd.ExitStatus{}, false, nil
}
return status, true, nil
}
}

func killTask(ctx context.Context, container containerd.Container, task containerd.Task, statusC <-chan containerd.ExitStatus) (containerd.ExitStatus, error) {
sig, err := containerd.GetStopSignal(ctx, container, syscall.SIGKILL)
if err != nil {
return containerd.ExitStatus{}, err
}
if err := task.Kill(ctx, sig, containerd.WithKillAll); err != nil && !errdefs.IsNotFound(err) {
return containerd.ExitStatus{}, errors.Wrapf(err, "forward SIGKILL")
}
select {
case status := <-statusC:
return status, nil
case <-time.After(5 * time.Second):
return containerd.ExitStatus{}, fmt.Errorf("timeout")
}
}
Loading

0 comments on commit 9c2288a

Please sign in to comment.