Skip to content

Commit

Permalink
vcsim: use docker event for efficient monitoring
Browse files Browse the repository at this point in the history
Makes use of the docker events stream to trigger inspect operations
against containers where waiting for things such as IPs.

Corrects prior failure to stop the async container watch when the
container was removed.

Updates to locking to avoid race warnings.

Updates vcsim.bats to look for a volume with `--dmi` suffix instead
of a volume with the plain container name.
  • Loading branch information
hickeng committed Aug 2, 2023
1 parent 548bdde commit a47a84f
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 33 deletions.
2 changes: 1 addition & 1 deletion govc/test/vcsim.bats
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ EOF
run docker inspect -f '{{.State.Status}}' "$name"
assert_success "running"

run docker volume inspect "$name"
run docker volume inspect "$name--dmi"
assert_success

run govc vm.destroy $vm
Expand Down
211 changes: 186 additions & 25 deletions simulator/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package simulator

import (
"archive/tar"
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -30,11 +32,13 @@ import (
"path"
"regexp"
"strings"
"sync"
"time"
)

var (
shell = "/bin/sh"
shell = "/bin/sh"
eventWatch eventWatcher
)

const (
Expand All @@ -48,10 +52,26 @@ func init() {
}
}

type eventWatcher struct {
sync.Mutex

stdin io.WriteCloser
stdout io.ReadCloser
process *os.Process

// watches is a map of container IDs to container objects
watches map[string]*container
}

// container provides methods to manage a container within a simulator VM lifecycle.
type container struct {
sync.Mutex

id string
name string

cancelWatch context.CancelFunc
changes chan struct{}
}

type networkSettings struct {
Expand Down Expand Up @@ -367,6 +387,7 @@ func create(ctx *Context, name string, id string, networks []string, volumes []s

var c container
c.name = constructContainerName(name, id)
c.changes = make(chan struct{})

for i := range volumes {
// we'll pre-create anonymous volumes, simply for labelling consistency
Expand Down Expand Up @@ -437,7 +458,11 @@ func (c *container) createVolume(name string, labels []string, files []tarEntry)
// * if c.id is empty, or docker returns "No such object", will return an uninitializedContainer error
// * err from either execution or parsing of json output
func (c *container) inspect() (out []byte, detail containerDetails, err error) {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
err = uninitializedContainer(errors.New("inspect of uninitialized container"))
return
}
Expand Down Expand Up @@ -472,7 +497,11 @@ func (c *container) inspect() (out []byte, detail containerDetails, err error) {
// start
// - if the container already exists, start it or unpause it.
func (c *container) start(ctx *Context) error {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
return uninitializedContainer(errors.New("start of uninitialized container"))
}

Expand All @@ -497,7 +526,11 @@ func (c *container) start(ctx *Context) error {

// pause the container (if any) for the given vm.
func (c *container) pause(ctx *Context) error {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
return uninitializedContainer(errors.New("pause of uninitialized container"))
}

Expand All @@ -512,7 +545,11 @@ func (c *container) pause(ctx *Context) error {

// restart the container (if any) for the given vm.
func (c *container) restart(ctx *Context) error {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
return uninitializedContainer(errors.New("restart of uninitialized container"))
}

Expand All @@ -527,7 +564,11 @@ func (c *container) restart(ctx *Context) error {

// stop the container (if any) for the given vm.
func (c *container) stop(ctx *Context) error {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
return uninitializedContainer(errors.New("stop of uninitialized container"))
}

Expand All @@ -548,7 +589,11 @@ func (c *container) stop(ctx *Context) error {
// * uninitializedContainer error - if c.id is empty
// * err from cmd execution
func (c *container) exec(ctx *Context, args []string) (string, error) {
if c.id == "" {
c.Lock()
id := c.id
c.Unlock()

if id == "" {
return "", uninitializedContainer(errors.New("exec into uninitialized container"))
}

Expand All @@ -569,6 +614,9 @@ func (c *container) exec(ctx *Context, args []string) (string, error) {
//
// err - joined err from deletion of container and any volumes or networks that have coupled lifecycle
func (c *container) remove(ctx *Context) error {
c.Lock()
defer c.Unlock()

if c.id == "" {
// consider absence success
return nil
Expand Down Expand Up @@ -620,53 +668,166 @@ func (c *container) remove(ctx *Context) error {
return fmt.Errorf("err: {%s}, lsverr: {%s}, rmverr: {%s}, lsnerr:{%s}, rmerr: {%s}", err, lsverr, rmverr, lsnerr, rmnerr)
}

if c.cancelWatch != nil {
c.cancelWatch()
eventWatch.ignore(c)
}
c.id = ""
return nil
}

// updated is a simple trigger allowing a caller to indicate that something has likely changed about the container
// and interested parties should re-inspect as needed.
func (c *container) updated() {
consolidationWindow := 250 * time.Millisecond
if d, err := time.ParseDuration(os.Getenv("VCSIM_EVENT_CONSOLIDATION_WINDOW")); err == nil {
consolidationWindow = d
}

select {
case c.changes <- struct{}{}:
time.Sleep(consolidationWindow)
// as this is only a hint to avoid waiting for the full inspect interval, we don't care about accumulating
// multiple triggers. We do pause to allow large numbers of sequential updates to consolidate
default:
}
}

// watchContainer monitors the underlying container and updates
// properties based on the container status. This occurs until either
// the container or the VM is removed.
// returns:
//
// err - uninitializedContainer error - if c.id is empty
func (c *container) watchContainer(ctx *Context, updateFn func(*Context, *containerDetails, *container) error) error {
func (c *container) watchContainer(ctx context.Context, updateFn func(*containerDetails, *container) error) error {
c.Lock()
defer c.Unlock()

if c.id == "" {
return uninitializedContainer(errors.New("Attempt to watch uninitialized container"))
}

eventWatch.watch(c)

cancelCtx, cancelFunc := context.WithCancel(ctx)
c.cancelWatch = cancelFunc

// Update the VM from the container at regular intervals until the done
// channel is closed.
go func() {
inspectInterval := time.Duration(5 * time.Second)
inspectInterval := 10 * time.Second
if d, err := time.ParseDuration(os.Getenv("VCSIM_INSPECT_INTERVAL")); err == nil {
inspectInterval = d
}
ticker := time.NewTicker(inspectInterval)

update := func() {
_, details, err := c.inspect()
var rmErr error
var removing bool
if _, ok := err.(uninitializedContainer); ok {
removing = true
rmErr = c.remove(SpoofContext())
}

updateErr := updateFn(&details, c)
// if we don't succeed we want to re-try
if removing && rmErr == nil && updateErr == nil {
ticker.Stop()
return
}
// TODO: log err?
}

for {
select {
case <-c.changes:
update()
case <-ticker.C:
_, details, err := c.inspect()
var rmErr error
var removing bool
if _, ok := err.(uninitializedContainer); ok {
removing = true
rmErr = c.remove(ctx)
}

updateErr := updateFn(ctx, &details, c)
// if we don't succeed we want to re-try
if removing && rmErr == nil && updateErr == nil {
ticker.Stop()
return
}
// TODO: log err?
case <-ctx.Done():
update()
case <-cancelCtx.Done():
return
}
}
}()

return nil
}

func (w *eventWatcher) watch(c *container) {
w.Lock()
defer w.Unlock()

if w.watches == nil {
w.watches = make(map[string]*container)
}

w.watches[c.id] = c

if w.stdin == nil {
cmd := exec.Command("docker", "events", "--format", "'{{.ID}}'", "--filter", "Type=container")
w.stdout, _ = cmd.StdoutPipe()
w.stdin, _ = cmd.StdinPipe()
err := cmd.Start()
if err != nil {
log.Printf("docker event watcher: %s %s", cmd.Args, err)
w.stdin = nil
w.stdout = nil
w.process = nil

return
}

w.process = cmd.Process

go w.monitor()
}
}

func (w *eventWatcher) ignore(c *container) {
w.Lock()

delete(w.watches, c.id)

if len(w.watches) == 0 && w.stdin != nil {
w.stop()
}

w.Unlock()
}

func (w *eventWatcher) monitor() {
w.Lock()
watches := len(w.watches)
w.Unlock()

if watches == 0 {
return
}

scanner := bufio.NewScanner(w.stdout)
for scanner.Scan() {
id := strings.TrimSpace(scanner.Text())

w.Lock()
container := w.watches[id]
w.Unlock()

if container != nil {
// this is called in a routine to allow an event consolidation window
go container.updated()
}
}
}

func (w *eventWatcher) stop() {
if w.stdin != nil {
w.stdin.Close()
w.stdin = nil
}
if w.stdout != nil {
w.stdout.Close()
w.stdout = nil
}
w.process.Kill()
}
17 changes: 10 additions & 7 deletions simulator/container_virtual_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package simulator

import (
"archive/tar"
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -294,14 +295,16 @@ func (svm *simVM) start(ctx *Context) error {
log.Printf("%s inspect %s: %s", svm.vm.Name, svm.c.id, err)
}

callback := func(ctx *Context, details *containerDetails, c *container) error {
callback := func(details *containerDetails, c *container) error {
spoofctx := SpoofContext()

if c.id == "" {
// If the container cannot be found then destroy this VM.
// TODO: figure out if we should pass the vm/container via ctx or otherwise from the callback - this might cause locking issues.
if c.id == "" && svm.vm != nil {
// If the container cannot be found then destroy this VM unless the VM is no longer configured for container backing (svm.vm == nil)
taskRef := svm.vm.DestroyTask(spoofctx, &types.Destroy_Task{This: svm.vm.Self}).(*methods.Destroy_TaskBody).Res.Returnval
task := ctx.Map.Get(taskRef).(*Task)
task, ok := spoofctx.Map.Get(taskRef).(*Task)
if !ok {
panic(fmt.Sprintf("couldn't retrieve task for moref %+q while deleting VM %s", taskRef, svm.vm.Name))
}

// Wait for the task to complete and see if there is an error.
task.Wait()
Expand All @@ -317,10 +320,10 @@ func (svm *simVM) start(ctx *Context) error {
}

// Start watching the container resource.
err = svm.c.watchContainer(ctx, callback)
err = svm.c.watchContainer(context.Background(), callback)
if _, ok := err.(uninitializedContainer); ok {
// the container has been deleted before we could watch, despite successful launch so clean up.
callback(ctx, nil, svm.c)
callback(nil, svm.c)

// successful launch so nil the error
return nil
Expand Down
4 changes: 4 additions & 0 deletions simulator/virtual_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,11 @@ func (vm *VirtualMachine) applyExtraConfig(ctx *Context, spec *types.VirtualMach
} else if removedContainerBacking {
err := vm.svm.remove(ctx)
if err == nil {
// remove link from container to VM so callbacks no longer reflect state
vm.svm.vm = nil
// nil container backing reference to return this to a pure in-mem simulated VM
vm.svm = nil

} else {
// don't attempt to undo the changes already made - just return an error
// we'll retry the svm.start operation on pause/restart calls
Expand Down

0 comments on commit a47a84f

Please sign in to comment.