From 73a8d544d11c9f2f0e20867464bddbb9c7657753 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Mon, 28 Dec 2020 21:36:05 +0100 Subject: [PATCH 01/11] Basic stats stream plumbing --- api/container_stats.go | 50 ++++++++++++++++++++++++++++++++++++++++++ api/structs.go | 36 ++++++++++++++++++++++++++++++ driver.go | 40 +++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ 5 files changed, 129 insertions(+) diff --git a/api/container_stats.go b/api/container_stats.go index f50b14c8..537ad030 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -7,6 +7,8 @@ import ( "fmt" "io/ioutil" "net/http" + + "github.com/mitchellh/go-linereader" ) var ContainerNotFound = errors.New("No such Container") @@ -45,3 +47,51 @@ func (c *API) ContainerStats(ctx context.Context, name string) (Stats, error) { return stats, nil } + +// ContainerStatsStream streams stats for all containers +func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, error) { + + res, err := c.Get(ctx, "/v1.0.0/libpod/containers/stats?stream=true") + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode) + } + + statsChannel := make(chan ContainerStats, 5) + lr := linereader.New(res.Body) + + go func() { + c.logger.Info("Running stats stream") + defer func() { + res.Body.Close() + close(statsChannel) + c.logger.Info("Stopped stats stream") + }() + for { + select { + case <-ctx.Done(): + c.logger.Debug("Stopping stats stream") + return + case line, ok := <-lr.Ch: + if !ok { + c.logger.Warn("Stats reader channel was closed") + return + } + var statsReport ContainerStatsReport + json.Unmarshal([]byte(line), &statsReport) + if statsReport.Error != nil { + c.logger.Error("Stats stream is broken", "error", statsReport.Error) + return + } + for _, stat := range statsReport.Stats { + statsChannel <- stat + } + } + } + }() + + return statsChannel, nil +} diff --git a/api/structs.go b/api/structs.go index 4c9ef090..db3f4422 100644 --- a/api/structs.go +++ b/api/structs.go @@ -1514,3 +1514,39 @@ type Version struct { Built int64 OsArch string } + +// ------------------------------------------------------------------------------------------------------- +// structs copied from https://github.com/containers/podman/blob/master/libpod/define/containerstate.go +// +// some unused parts are modified/commented out to not pull more dependencies +// +// some fields are reordert to make the linter happy (bytes maligned complains) +// ------------------------------------------------------------------------------------------------------- + +// ContainerStats contains the statistics information for a running container +type ContainerStats struct { + ContainerID string + Name string + PerCPU []uint64 + CPU float64 + CPUNano uint64 + CPUSystemNano uint64 + SystemNano uint64 + MemUsage uint64 + MemLimit uint64 + MemPerc float64 + NetInput uint64 + NetOutput uint64 + BlockInput uint64 + BlockOutput uint64 + PIDs uint64 +} + +// ContainerStatsReport is used for streaming container stats. +// https://github.com/containers/podman/blob/master/pkg/domain/entities/containers.go +type ContainerStatsReport struct { + // Error from reading stats. + Error error + // Results, set when there is no error. + Stats []ContainerStats +} diff --git a/driver.go b/driver.go index 8f82fc07..aea89ea7 100644 --- a/driver.go +++ b/driver.go @@ -228,6 +228,11 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { // it is used to toggle cgroup v1/v2, rootless/rootful behavior d.systemInfo = info d.cgroupV2 = info.Host.CGroupsVersion == "v2" + err = d.runStatsStreamer() + if err != nil { + d.logger.Error("Could not open stats stream", "err", err) + health = drivers.HealthStateUnhealthy + } } } @@ -238,6 +243,41 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } +func (d *Driver) runStatsStreamer() error { + var err error + var statsChannel chan api.ContainerStats + + statsChannel, err = d.podman.ContainerStatsStream(d.ctx) + if err != nil { + return err + } + + go func() { + for { + select { + case <-d.ctx.Done(): + return + case stats, ok := <-statsChannel: + if !ok { + // re-run api request on http timeout/connection loss + statsChannel, err = d.podman.ContainerStatsStream(d.ctx) + if err != nil { + // throttle retries on error + d.logger.Warn("Failed to rerun stats stream api request", "err", err) + time.Sleep(time.Second * 3) + } + d.logger.Debug("Rerun stats stream") + continue + } + d.logger.Info("GOT", "stats", stats) + } + } + + }() + + return nil +} + // RecoverTask detects running tasks when nomad client or task driver is restarted. // When a driver is restarted it is not expected to persist any internal state to disk. // To support this, Nomad will attempt to recover a task that was previously started diff --git a/go.mod b/go.mod index aa7a0c74..68292003 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/hashicorp/nomad v1.0.0 github.com/hashicorp/nomad/api v0.0.0-20201208134522-a480eed0815c github.com/mattn/go-colorable v0.1.7 // indirect + github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb github.com/mitchellh/go-ps v1.0.0 // indirect github.com/onsi/ginkgo v1.13.0 // indirect github.com/opencontainers/runtime-spec v1.0.3-0.20200728170252-4d89ac9fbff6 diff --git a/go.sum b/go.sum index c76bd55c..56cf6141 100644 --- a/go.sum +++ b/go.sum @@ -623,6 +623,8 @@ github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFW github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb h1:GRiLv4rgyqjqzxbhJke65IYUf4NCOOvrPOJbV/sPxkM= +github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb/go.mod h1:OaY7UOoTkkrX3wRwjpYRKafIkkyeD0UtweSHAWWiqQM= github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= From 7c7a655f054cb56a8327767e1eabced8fd8f4d21 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Thu, 31 Dec 2020 12:13:24 +0100 Subject: [PATCH 02/11] Stream stats from global podman listener into handles --- driver.go | 8 +++++--- handle.go | 23 ++++++++--------------- state.go | 43 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/driver.go b/driver.go index aea89ea7..e8781f3d 100644 --- a/driver.go +++ b/driver.go @@ -243,6 +243,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } +// stream stats from a global podman listener into task handles func (d *Driver) runStatsStreamer() error { var err error var statsChannel chan api.ContainerStats @@ -257,7 +258,7 @@ func (d *Driver) runStatsStreamer() error { select { case <-d.ctx.Done(): return - case stats, ok := <-statsChannel: + case containerStats, ok := <-statsChannel: if !ok { // re-run api request on http timeout/connection loss statsChannel, err = d.podman.ContainerStatsStream(d.ctx) @@ -269,10 +270,11 @@ func (d *Driver) runStatsStreamer() error { d.logger.Debug("Rerun stats stream") continue } - d.logger.Info("GOT", "stats", stats) + if !d.tasks.UpdateContainerStats(containerStats) { + d.logger.Debug("Ignore stats for unknown container", "container", containerStats.Name) + } } } - }() return nil diff --git a/handle.go b/handle.go index b793005e..4d5d9327 100644 --- a/handle.go +++ b/handle.go @@ -15,7 +15,7 @@ import ( var ( measuredCPUStats = []string{"System Mode", "User Mode", "Percent"} - measuredMemStats = []string{"Usage", "Max Usage"} + measuredMemStats = []string{"Usage"} ) // TaskHandle is the podman specific handle for exactly one container @@ -39,7 +39,7 @@ type TaskHandle struct { removeContainerOnExit bool - containerStats api.Stats + containerStats api.ContainerStats } func (h *TaskHandle) taskStatus() *drivers.TaskStatus { @@ -108,10 +108,10 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri t := time.Now() //FIXME implement cpu stats correctly - totalPercent := h.totalCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.TotalUsage)) + totalPercent := h.containerStats.CPU cs := &drivers.CpuStats{ - SystemMode: h.systemCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.UsageInKernelmode)), - UserMode: h.userCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.UsageInUsermode)), + SystemMode: h.systemCPUStats.Percent(float64(h.containerStats.CPUSystemNano)), + UserMode: h.userCPUStats.Percent(float64(h.containerStats.CPUNano)), Percent: totalPercent, TotalTicks: h.systemCPUStats.TicksConsumed(totalPercent), Measured: measuredCPUStats, @@ -120,9 +120,8 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri //h.driver.logger.Info("stats", "cpu", containerStats.Cpu, "system", containerStats.System_nano, "user", containerStats.Cpu_nano, "percent", totalPercent, "ticks", cs.TotalTicks, "cpus", cpus, "available", available) ms := &drivers.MemoryStats{ - MaxUsage: h.containerStats.MemoryStats.MaxUsage, - Usage: h.containerStats.MemoryStats.Usage, - RSS: h.containerStats.MemoryStats.Usage, + Usage: h.containerStats.MemUsage, + RSS: h.containerStats.MemUsage, Measured: measuredMemStats, } h.stateLock.Unlock() @@ -160,7 +159,7 @@ func (h *TaskHandle) runContainerMonitor() { timer.Reset(interval) } - containerStats, statsErr := h.driver.podman.ContainerStats(h.driver.ctx, h.containerID) + _, statsErr := h.driver.podman.ContainerStats(h.driver.ctx, h.containerID) if statsErr != nil { gone := false if errors.Is(statsErr, api.ContainerNotFound) { @@ -201,11 +200,5 @@ func (h *TaskHandle) runContainerMonitor() { h.logger.Debug("Could not get container stats, unknown error", "err", fmt.Sprintf("%#v", statsErr)) continue } - - h.stateLock.Lock() - // keep last known containerStats in handle to - // have it available in the stats emitter - h.containerStats = containerStats - h.stateLock.Unlock() } } diff --git a/state.go b/state.go index bef9d61d..6f79287d 100644 --- a/state.go +++ b/state.go @@ -2,32 +2,63 @@ package main import ( "sync" + + "github.com/hashicorp/nomad-driver-podman/api" ) type taskStore struct { - store map[string]*TaskHandle - lock sync.RWMutex + taskIdToHandle map[string]*TaskHandle + containerIdToHandle map[string]*TaskHandle + lock sync.RWMutex } func newTaskStore() *taskStore { - return &taskStore{store: map[string]*TaskHandle{}} + return &taskStore{ + taskIdToHandle: map[string]*TaskHandle{}, + containerIdToHandle: map[string]*TaskHandle{}, + } } func (ts *taskStore) Set(id string, handle *TaskHandle) { ts.lock.Lock() defer ts.lock.Unlock() - ts.store[id] = handle + ts.taskIdToHandle[id] = handle + ts.containerIdToHandle[handle.containerID] = handle } func (ts *taskStore) Get(id string) (*TaskHandle, bool) { ts.lock.RLock() defer ts.lock.RUnlock() - t, ok := ts.store[id] + t, ok := ts.taskIdToHandle[id] + return t, ok +} + +func (ts *taskStore) GetByContainerId(containerID string) (*TaskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.containerIdToHandle[containerID] return t, ok } func (ts *taskStore) Delete(id string) { ts.lock.Lock() defer ts.lock.Unlock() - delete(ts.store, id) + t, ok := ts.taskIdToHandle[id] + if ok { + delete(ts.containerIdToHandle, t.containerID) + } + delete(ts.taskIdToHandle, id) +} + +// keep last known containerStats in handle to +// have it available in the stats emitter +func (ts *taskStore) UpdateContainerStats(containerStats api.ContainerStats) bool { + taskHandle, ok := ts.GetByContainerId(containerStats.ContainerID) + if ok { + + taskHandle.stateLock.Lock() + taskHandle.containerStats = containerStats + taskHandle.stateLock.Unlock() + } + return ok } From 2345331a9bcf4dba79b44f47b590203dc328b058 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Fri, 1 Jan 2021 17:46:48 +0100 Subject: [PATCH 03/11] Monitor container stop via libpod event --- api/container_stats.go | 6 +-- driver.go | 51 ++++++++++++++++---- handle.go | 103 ++++++++++++++--------------------------- state.go | 22 ++++++++- 4 files changed, 100 insertions(+), 82 deletions(-) diff --git a/api/container_stats.go b/api/container_stats.go index 537ad030..ea955201 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -64,11 +64,11 @@ func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, er lr := linereader.New(res.Body) go func() { - c.logger.Info("Running stats stream") + c.logger.Debug("Running stats stream") defer func() { res.Body.Close() close(statsChannel) - c.logger.Info("Stopped stats stream") + c.logger.Debug("Stopped stats stream") }() for { select { @@ -77,7 +77,7 @@ func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, er return case line, ok := <-lr.Ch: if !ok { - c.logger.Warn("Stats reader channel was closed") + c.logger.Debug("Stats reader channel was closed") return } var statsReport ContainerStatsReport diff --git a/driver.go b/driver.go index e8781f3d..2aca8b87 100644 --- a/driver.go +++ b/driver.go @@ -115,7 +115,7 @@ func NewPodmanDriver(logger hclog.Logger) drivers.DriverPlugin { return &Driver{ eventer: eventer.NewEventer(ctx, logger), config: &PluginConfig{}, - tasks: newTaskStore(), + tasks: newTaskStore(logger), ctx: ctx, signalShutdown: cancel, logger: logger.Named(pluginName), @@ -230,7 +230,12 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { d.cgroupV2 = info.Host.CGroupsVersion == "v2" err = d.runStatsStreamer() if err != nil { - d.logger.Error("Could not open stats stream", "err", err) + d.logger.Error("Could not start stats stream", "err", err) + health = drivers.HealthStateUnhealthy + } + err = d.runEventStreamer() + if err != nil { + d.logger.Error("Could not start event stream", "err", err) health = drivers.HealthStateUnhealthy } } @@ -280,6 +285,41 @@ func (d *Driver) runStatsStreamer() error { return nil } +// stream events from a global podman listener into task handles +func (d *Driver) runEventStreamer() error { + var err error + var eventsChannel chan interface{} + + eventsChannel, err = d.podman.LibpodEventStream(d.ctx) + if err != nil { + return err + } + + go func() { + for { + select { + case <-d.ctx.Done(): + return + case event, ok := <-eventsChannel: + if !ok { + // re-run api request on http timeout/connection loss + eventsChannel, err = d.podman.LibpodEventStream(d.ctx) + if err != nil { + // throttle retries on error + d.logger.Warn("Failed to rerun event stream api request", "err", err) + time.Sleep(time.Second * 3) + } + d.logger.Debug("Rerun event stream") + continue + } + d.tasks.HandleLibpodEvent(event) + } + } + }() + + return nil +} + // RecoverTask detects running tasks when nomad client or task driver is restarted. // When a driver is restarted it is not expected to persist any internal state to disk. // To support this, Nomad will attempt to recover a task that was previously started @@ -312,7 +352,6 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { taskConfig: taskState.TaskConfig, procState: drivers.TaskStateUnknown, startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, logger: d.logger.Named("podmanHandle"), totalCPUStats: stats.NewCpuStats(), @@ -350,7 +389,6 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { d.tasks.Set(taskState.TaskConfig.ID, h) - go h.runContainerMonitor() d.logger.Debug("Recovered container handle", "container", taskState.ContainerID) return nil @@ -567,7 +605,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive driver: d, taskConfig: cfg, procState: drivers.TaskStateRunning, - exitResult: &drivers.ExitResult{}, startedAt: time.Now().Round(time.Millisecond), logger: d.logger.Named("podmanHandle"), @@ -592,9 +629,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.tasks.Set(cfg.ID, h) - - go h.runContainerMonitor() - d.logger.Info("Completely started container", "taskID", cfg.ID, "container", containerID, "ip", inspectData.NetworkSettings.IPAddress) return handle, net, nil @@ -661,6 +695,7 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e // DestroyTask function cleans up and removes a task that has terminated. // If force is set to true, the driver must destroy the task even if it is still running. func (d *Driver) DestroyTask(taskID string, force bool) error { + d.logger.Info("Destroy task", "taskID", taskID) handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound diff --git a/handle.go b/handle.go index 4d5d9327..ba584adb 100644 --- a/handle.go +++ b/handle.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "sync" "time" @@ -27,6 +26,7 @@ type TaskHandle struct { totalCPUStats *stats.CpuStats userCPUStats *stats.CpuStats systemCPUStats *stats.CpuStats + diedChannel chan bool // stateLock syncs access to all fields below stateLock sync.RWMutex @@ -66,7 +66,6 @@ func (h *TaskHandle) isRunning() bool { } func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drivers.ExitResult) { - timer := time.NewTimer(0) h.logger.Debug("Starting exitWatcher", "container", h.containerID) defer func() { @@ -79,15 +78,17 @@ func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drive close(exitChannel) }() + if !h.isRunning() { + h.logger.Debug("No need to run exitWatcher on a stopped container") + return + } + for { - if !h.isRunning() { - return - } select { case <-ctx.Done(): return - case <-timer.C: - timer.Reset(time.Second) + case <-h.diedChannel: + return } } } @@ -107,7 +108,6 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri h.stateLock.Lock() t := time.Now() - //FIXME implement cpu stats correctly totalPercent := h.containerStats.CPU cs := &drivers.CpuStats{ SystemMode: h.systemCPUStats.Percent(float64(h.containerStats.CPUSystemNano)), @@ -117,8 +117,6 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri Measured: measuredCPUStats, } - //h.driver.logger.Info("stats", "cpu", containerStats.Cpu, "system", containerStats.System_nano, "user", containerStats.Cpu_nano, "percent", totalPercent, "ticks", cs.TotalTicks, "cpus", cpus, "available", available) - ms := &drivers.MemoryStats{ Usage: h.containerStats.MemUsage, RSS: h.containerStats.MemUsage, @@ -139,66 +137,33 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri } } -func (h *TaskHandle) runContainerMonitor() { - - timer := time.NewTimer(0) - interval := time.Second * 1 - h.logger.Debug("Monitoring container", "container", h.containerID) - - cleanup := func() { - h.logger.Debug("Container monitor exits", "container", h.containerID) - } - defer cleanup() - - for { - select { - case <-h.driver.ctx.Done(): - return - - case <-timer.C: - timer.Reset(interval) +func (h *TaskHandle) onContainerDied(event api.ContainerDiedEvent) { + h.logger.Debug("Container is not running anymore", "event", event) + // container was stopped, get exit code and other post mortem infos + inspectData, err := h.driver.podman.ContainerInspect(h.driver.ctx, h.containerID) + h.stateLock.Lock() + h.completedAt = time.Now() + if err != nil { + h.exitResult.Err = fmt.Errorf("Driver was unable to get the exit code. %s: %v", h.containerID, err) + h.logger.Error("Failed to inspect stopped container, can not get exit code", "container", h.containerID, "err", err) + h.exitResult.Signal = 0 + } else { + h.exitResult.ExitCode = int(inspectData.State.ExitCode) + if len(inspectData.State.Error) > 0 { + h.exitResult.Err = fmt.Errorf(inspectData.State.Error) + h.logger.Error("Container error", "container", h.containerID, "err", h.exitResult.Err) } - - _, statsErr := h.driver.podman.ContainerStats(h.driver.ctx, h.containerID) - if statsErr != nil { - gone := false - if errors.Is(statsErr, api.ContainerNotFound) { - gone = true - } else if errors.Is(statsErr, api.ContainerWrongState) { - gone = true - } - if gone { - h.logger.Debug("Container is not running anymore", "container", h.containerID, "err", statsErr) - // container was stopped, get exit code and other post mortem infos - inspectData, err := h.driver.podman.ContainerInspect(h.driver.ctx, h.containerID) - h.stateLock.Lock() - h.completedAt = time.Now() - if err != nil { - h.exitResult.Err = fmt.Errorf("Driver was unable to get the exit code. %s: %v", h.containerID, err) - h.logger.Error("Failed to inspect stopped container, can not get exit code", "container", h.containerID, "err", err) - h.exitResult.Signal = 0 - } else { - h.exitResult.ExitCode = int(inspectData.State.ExitCode) - if len(inspectData.State.Error) > 0 { - h.exitResult.Err = fmt.Errorf(inspectData.State.Error) - h.logger.Error("Container error", "container", h.containerID, "err", h.exitResult.Err) - } - h.completedAt = inspectData.State.FinishedAt - if inspectData.State.OOMKilled { - h.exitResult.OOMKilled = true - h.exitResult.Err = fmt.Errorf("Podman container killed by OOM killer") - h.logger.Error("Podman container killed by OOM killer", "container", h.containerID) - } - } - - h.procState = drivers.TaskStateExited - h.stateLock.Unlock() - return - } - // continue and wait for next cycle, it should eventually - // fall into the "TaskStateExited" case - h.logger.Debug("Could not get container stats, unknown error", "err", fmt.Sprintf("%#v", statsErr)) - continue + h.completedAt = inspectData.State.FinishedAt + if inspectData.State.OOMKilled { + h.exitResult.OOMKilled = true + h.exitResult.Err = fmt.Errorf("Podman container killed by OOM killer") + h.logger.Error("Podman container killed by OOM killer", "container", h.containerID) } } + + h.procState = drivers.TaskStateExited + h.stateLock.Unlock() + // unblock exitWatcher go routine + close(h.diedChannel) + return } diff --git a/state.go b/state.go index 6f79287d..2900e979 100644 --- a/state.go +++ b/state.go @@ -3,17 +3,21 @@ package main import ( "sync" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-driver-podman/api" + "github.com/hashicorp/nomad/plugins/drivers" ) type taskStore struct { taskIdToHandle map[string]*TaskHandle containerIdToHandle map[string]*TaskHandle lock sync.RWMutex + logger hclog.Logger } -func newTaskStore() *taskStore { +func newTaskStore(logger hclog.Logger) *taskStore { return &taskStore{ + logger: logger, taskIdToHandle: map[string]*TaskHandle{}, containerIdToHandle: map[string]*TaskHandle{}, } @@ -22,6 +26,8 @@ func newTaskStore() *taskStore { func (ts *taskStore) Set(id string, handle *TaskHandle) { ts.lock.Lock() defer ts.lock.Unlock() + handle.exitResult = new(drivers.ExitResult) + handle.diedChannel = make(chan bool) ts.taskIdToHandle[id] = handle ts.containerIdToHandle[handle.containerID] = handle } @@ -55,10 +61,22 @@ func (ts *taskStore) Delete(id string) { func (ts *taskStore) UpdateContainerStats(containerStats api.ContainerStats) bool { taskHandle, ok := ts.GetByContainerId(containerStats.ContainerID) if ok { - taskHandle.stateLock.Lock() taskHandle.containerStats = containerStats taskHandle.stateLock.Unlock() } return ok } + +// Manage task handle state by consuming libpod events +func (ts *taskStore) HandleLibpodEvent(e interface{}) { + switch e.(type) { + case api.ContainerDiedEvent: + event, _ := e.(api.ContainerDiedEvent) + taskHandle, ok := ts.GetByContainerId(event.ID) + if ok { + taskHandle.onContainerDied(event) + } + } + +} From b56c25a9ea81aaa9cd0ce18333e621ad0d65a760 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Fri, 1 Jan 2021 20:34:52 +0100 Subject: [PATCH 04/11] Stream events from global podman listener into handles --- api/api.go | 18 +++++- api/container_start.go | 7 --- api/container_stats.go | 2 +- api/libpod_events.go | 135 +++++++++++++++++++++++++++++++++++++++++ driver.go | 33 +++++----- 5 files changed, 168 insertions(+), 27 deletions(-) create mode 100644 api/libpod_events.go diff --git a/api/api.go b/api/api.go index 94c4d60c..a962939e 100644 --- a/api/api.go +++ b/api/api.go @@ -14,9 +14,10 @@ import ( ) type API struct { - baseUrl string - httpClient *http.Client - logger hclog.Logger + baseUrl string + httpClient *http.Client + httpStreamClient *http.Client + logger hclog.Logger } type ClientConfig struct { @@ -49,6 +50,8 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API { ac.httpClient = &http.Client{ Timeout: config.HttpTimeout, } + // we do not want a timeout for streaming requests. + ac.httpStreamClient = &http.Client{} if strings.HasPrefix(baseUrl, "unix:") { ac.baseUrl = "http://u" path := strings.TrimPrefix(baseUrl, "unix:") @@ -57,6 +60,7 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API { return net.Dial("unix", path) }, } + ac.httpStreamClient.Transport = ac.httpClient.Transport } else { ac.baseUrl = baseUrl } @@ -77,6 +81,14 @@ func (c *API) Get(ctx context.Context, path string) (*http.Response, error) { return c.Do(req) } +func (c *API) GetStream(ctx context.Context, path string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.baseUrl+path, nil) + if err != nil { + return nil, err + } + return c.httpStreamClient.Do(req) +} + func (c *API) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, "POST", c.baseUrl+path, body) if err != nil { diff --git a/api/container_start.go b/api/container_start.go index cedf3249..8234c34a 100644 --- a/api/container_start.go +++ b/api/container_start.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "net/http" - "time" ) // ContainerStart starts a container via id or name @@ -23,11 +22,5 @@ func (c *API) ContainerStart(ctx context.Context, name string) error { return fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body) } - // wait max 10 seconds for running state - // TODO: make timeout configurable - timeout, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - err = c.ContainerWait(timeout, name, "running") return err } diff --git a/api/container_stats.go b/api/container_stats.go index ea955201..3017a65c 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -51,7 +51,7 @@ func (c *API) ContainerStats(ctx context.Context, name string) (Stats, error) { // ContainerStatsStream streams stats for all containers func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, error) { - res, err := c.Get(ctx, "/v1.0.0/libpod/containers/stats?stream=true") + res, err := c.GetStream(ctx, "/v1.0.0/libpod/containers/stats?stream=true") if err != nil { return nil, err } diff --git a/api/libpod_events.go b/api/libpod_events.go new file mode 100644 index 00000000..8ace5dc7 --- /dev/null +++ b/api/libpod_events.go @@ -0,0 +1,135 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/mitchellh/go-linereader" +) + +// PodmanEvent is the common header for all events +type PodmanEvent struct { + Type string + Action string +} + +// ContainerEvent is a generic PodmanEvent for a single container +// example json: +// {"Type":"container","Action":"create","Actor":{"ID":"cc0d7849692360df2cba94eafb2715b9deec0cbd96ec41c3329dd8636cd070ce","Attributes":{"containerExitCode":"0","image":"docker.io/library/redis:latest","name":"redis-6f2b07a8-73e9-7098-83e1-55939851d46d"}},"scope":"local","time":1609413164,"timeNano":1609413164982188073} +type ContainerEvent struct { + // create/init/start/stop/died + Action string `json:"Action"` + Scope string `json:"scope"` + TimeNano uint64 `json:"timeNano"` + Time uint32 `json:"time"` + Actor ContainerEventActor `json:"Actor"` +} + +type ContainerEventActor struct { + ID string `json:"ID"` + Attributes ContainerEventAttributes `json:"Attributes"` +} + +type ContainerEventAttributes struct { + Image string `json:"image"` + Name string `json:"name"` + ContainerExitCode string `json:"containerExitCode"` +} + +// ContainerStartEvent is emitted when a container completely started +type ContainerStartEvent struct { + ID string + Name string +} + +// ContainerDiedEvent is emitted when a container exited +type ContainerDiedEvent struct { + ID string + Name string + ExitCode int +} + +// LibpodEventStream streams podman events +func (c *API) LibpodEventStream(ctx context.Context) (chan interface{}, error) { + + res, err := c.GetStream(ctx, "/v1.0.0/libpod/events?stream=true") + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode) + } + + eventsChannel := make(chan interface{}, 5) + lr := linereader.New(res.Body) + + go func() { + c.logger.Debug("Running libpod event stream") + defer func() { + res.Body.Close() + close(eventsChannel) + c.logger.Debug("Stopped libpod event stream") + }() + for { + select { + case <-ctx.Done(): + c.logger.Debug("Stopping libpod event stream") + return + case line, ok := <-lr.Ch: + if !ok { + c.logger.Debug("Event reader channel was closed") + return + } + var podmanEvent PodmanEvent + err := json.Unmarshal([]byte(line), &podmanEvent) + if err != nil { + c.logger.Error("Unable to parse libpod event", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + if podmanEvent.Type == "container" { + var containerEvent ContainerEvent + err := json.Unmarshal([]byte(line), &containerEvent) + if err != nil { + c.logger.Error("Unable to parse ContainerEvent", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + switch containerEvent.Action { + case "start": + eventsChannel <- ContainerStartEvent{ + ID: containerEvent.Actor.ID, + Name: containerEvent.Actor.Attributes.Name, + } + continue + case "died": + i, err := strconv.Atoi(containerEvent.Actor.Attributes.ContainerExitCode) + if err != nil { + c.logger.Error("Unable to parse ContainerEvent exitCode", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + eventsChannel <- ContainerDiedEvent{ + ID: containerEvent.Actor.ID, + Name: containerEvent.Actor.Attributes.Name, + ExitCode: i, + } + continue + } + // no action specific parser? emit what we've got + eventsChannel <- containerEvent + continue + } + + // emit a generic event if we do not have a parser for it + eventsChannel <- podmanEvent + } + } + }() + + return eventsChannel, nil +} diff --git a/driver.go b/driver.go index 2aca8b87..7ce93cc6 100644 --- a/driver.go +++ b/driver.go @@ -573,11 +573,28 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive containerID = createResponse.Id } + h := &TaskHandle{ + containerID: containerID, + driver: d, + taskConfig: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger.Named("podmanHandle"), + + totalCPUStats: stats.NewCpuStats(), + userCPUStats: stats.NewCpuStats(), + systemCPUStats: stats.NewCpuStats(), + + removeContainerOnExit: d.config.GC.Container, + } + d.tasks.Set(cfg.ID, h) + cleanup := func() { d.logger.Debug("Cleaning up", "container", containerID) if err := d.podman.ContainerDelete(d.ctx, containerID, true, true); err != nil { d.logger.Error("failed to clean up from an error in Start", "error", err) } + d.tasks.Delete(cfg.ID) } if !recoverRunningContainer { @@ -600,21 +617,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive AutoAdvertise: true, } - h := &TaskHandle{ - containerID: containerID, - driver: d, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger.Named("podmanHandle"), - - totalCPUStats: stats.NewCpuStats(), - userCPUStats: stats.NewCpuStats(), - systemCPUStats: stats.NewCpuStats(), - - removeContainerOnExit: d.config.GC.Container, - } - driverState := TaskState{ ContainerID: containerID, TaskConfig: cfg, @@ -628,7 +630,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } - d.tasks.Set(cfg.ID, h) d.logger.Info("Completely started container", "taskID", cfg.ID, "container", containerID, "ip", inspectData.NetworkSettings.IPAddress) return handle, net, nil From db3b563931f20bed325d99d3f84a732b0c154952 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Sat, 2 Jan 2021 12:22:02 +0100 Subject: [PATCH 05/11] Fixed caps unittest, adopted to newer podman version --- driver_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/driver_test.go b/driver_test.go index d3ffda57..dda3fc0c 100644 --- a/driver_test.go +++ b/driver_test.go @@ -1106,8 +1106,8 @@ func TestPodmanDriver_DefaultCaps(t *testing.T) { // a default container should not have SYS_TIME require.NotContains(t, inspectData.EffectiveCaps, "CAP_SYS_TIME") - // a default container gets MKNOD cap - require.Contains(t, inspectData.EffectiveCaps, "CAP_MKNOD") + // a default container gets CHOWN cap + require.Contains(t, inspectData.EffectiveCaps, "CAP_CHOWN") } // check modified capabilities (CapAdd/CapDrop) @@ -1120,14 +1120,14 @@ func TestPodmanDriver_Caps(t *testing.T) { // cap_drop = [ // "MKNOD", // ] - taskCfg.CapDrop = []string{"MKNOD"} + taskCfg.CapDrop = []string{"CHOWN"} inspectData := startDestroyInspect(t, taskCfg, "caps") // we added SYS_TIME, so we should see it in inspect require.Contains(t, inspectData.EffectiveCaps, "CAP_SYS_TIME") - // we dropped CAP_MKNOD, so we should NOT see it in inspect - require.NotContains(t, inspectData.EffectiveCaps, "CAP_MKNOD") + // we dropped CAP_CHOWN, so we should NOT see it in inspect + require.NotContains(t, inspectData.EffectiveCaps, "CAP_CHOWN") } // check dns server configuration From d0a2ccd7314fb12c8b9c2e694262e1be067804c8 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Sat, 2 Jan 2021 12:22:33 +0100 Subject: [PATCH 06/11] Linter --- api/container_stats.go | 5 ++++- handle.go | 1 - state.go | 4 +--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api/container_stats.go b/api/container_stats.go index 3017a65c..72e1aba7 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -81,7 +81,10 @@ func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, er return } var statsReport ContainerStatsReport - json.Unmarshal([]byte(line), &statsReport) + if jerr := json.Unmarshal([]byte(line), &statsReport); jerr != nil { + c.logger.Error("Unable to unmarshal statsreport", "err", err) + return + } if statsReport.Error != nil { c.logger.Error("Stats stream is broken", "error", statsReport.Error) return diff --git a/handle.go b/handle.go index ba584adb..9bf3ee13 100644 --- a/handle.go +++ b/handle.go @@ -165,5 +165,4 @@ func (h *TaskHandle) onContainerDied(event api.ContainerDiedEvent) { h.stateLock.Unlock() // unblock exitWatcher go routine close(h.diedChannel) - return } diff --git a/state.go b/state.go index 2900e979..01a7d4d2 100644 --- a/state.go +++ b/state.go @@ -70,13 +70,11 @@ func (ts *taskStore) UpdateContainerStats(containerStats api.ContainerStats) boo // Manage task handle state by consuming libpod events func (ts *taskStore) HandleLibpodEvent(e interface{}) { - switch e.(type) { + switch event := e.(type) { case api.ContainerDiedEvent: - event, _ := e.(api.ContainerDiedEvent) taskHandle, ok := ts.GetByContainerId(event.ID) if ok { taskHandle.onContainerDied(event) } } - } From 8e1de65047435fe3051e8e3924fc23cba7f18fdb Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Sat, 2 Jan 2021 17:35:24 +0100 Subject: [PATCH 07/11] Improved statsEmitter moved state into go routine, avoid locks --- driver.go | 9 ------- handle.go | 73 +++++++++++++++++++++++++++++-------------------------- state.go | 8 +++--- 3 files changed, 41 insertions(+), 49 deletions(-) diff --git a/driver.go b/driver.go index 7ce93cc6..2998aca1 100644 --- a/driver.go +++ b/driver.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-driver-podman/api" "github.com/hashicorp/nomad-driver-podman/version" - "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/drivers/shared/eventer" nstructs "github.com/hashicorp/nomad/nomad/structs" @@ -354,10 +353,6 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { startedAt: taskState.StartedAt, logger: d.logger.Named("podmanHandle"), - totalCPUStats: stats.NewCpuStats(), - userCPUStats: stats.NewCpuStats(), - systemCPUStats: stats.NewCpuStats(), - removeContainerOnExit: d.config.GC.Container, } @@ -581,10 +576,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive startedAt: time.Now().Round(time.Millisecond), logger: d.logger.Named("podmanHandle"), - totalCPUStats: stats.NewCpuStats(), - userCPUStats: stats.NewCpuStats(), - systemCPUStats: stats.NewCpuStats(), - removeContainerOnExit: d.config.GC.Container, } d.tasks.Set(cfg.ID, h) diff --git a/handle.go b/handle.go index 9bf3ee13..16ea2188 100644 --- a/handle.go +++ b/handle.go @@ -23,10 +23,10 @@ type TaskHandle struct { logger hclog.Logger driver *Driver - totalCPUStats *stats.CpuStats - userCPUStats *stats.CpuStats - systemCPUStats *stats.CpuStats - diedChannel chan bool + diedChannel chan bool + + // receive container stats from global podman stats streamer + containerStatsChannel chan api.ContainerStats // stateLock syncs access to all fields below stateLock sync.RWMutex @@ -38,8 +38,6 @@ type TaskHandle struct { exitResult *drivers.ExitResult removeContainerOnExit bool - - containerStats api.ContainerStats } func (h *TaskHandle) taskStatus() *drivers.TaskStatus { @@ -95,45 +93,50 @@ func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drive func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *drivers.TaskResourceUsage, interval time.Duration) { timer := time.NewTimer(0) + + containerStats := api.ContainerStats{} + userCPUStats := stats.NewCpuStats() + systemCPUStats := stats.NewCpuStats() + h.logger.Debug("Starting statsEmitter", "container", h.containerID) for { select { + case <-ctx.Done(): h.logger.Debug("Stopping statsEmitter", "container", h.containerID) return - case <-timer.C: - timer.Reset(interval) - } - h.stateLock.Lock() - t := time.Now() - - totalPercent := h.containerStats.CPU - cs := &drivers.CpuStats{ - SystemMode: h.systemCPUStats.Percent(float64(h.containerStats.CPUSystemNano)), - UserMode: h.userCPUStats.Percent(float64(h.containerStats.CPUNano)), - Percent: totalPercent, - TotalTicks: h.systemCPUStats.TicksConsumed(totalPercent), - Measured: measuredCPUStats, - } + case containerStats = <-h.containerStatsChannel: + continue - ms := &drivers.MemoryStats{ - Usage: h.containerStats.MemUsage, - RSS: h.containerStats.MemUsage, - Measured: measuredMemStats, - } - h.stateLock.Unlock() + case <-timer.C: + timer.Reset(interval) - // update uasge - usage := drivers.TaskResourceUsage{ - ResourceUsage: &drivers.ResourceUsage{ - CpuStats: cs, - MemoryStats: ms, - }, - Timestamp: t.UTC().UnixNano(), + totalPercent := containerStats.CPU + + cs := &drivers.CpuStats{ + SystemMode: systemCPUStats.Percent(float64(containerStats.CPUSystemNano)), + UserMode: userCPUStats.Percent(float64(containerStats.CPUNano)), + Percent: totalPercent, + TotalTicks: systemCPUStats.TicksConsumed(totalPercent), + Measured: measuredCPUStats, + } + + ms := &drivers.MemoryStats{ + Usage: containerStats.MemUsage, + RSS: containerStats.MemUsage, + Measured: measuredMemStats, + } + + // send stats to nomad + statsChannel <- &drivers.TaskResourceUsage{ + ResourceUsage: &drivers.ResourceUsage{ + CpuStats: cs, + MemoryStats: ms, + }, + Timestamp: time.Now().UTC().UnixNano(), + } } - // send stats to nomad - statsChannel <- &usage } } diff --git a/state.go b/state.go index 01a7d4d2..a4a1ed5e 100644 --- a/state.go +++ b/state.go @@ -28,6 +28,7 @@ func (ts *taskStore) Set(id string, handle *TaskHandle) { defer ts.lock.Unlock() handle.exitResult = new(drivers.ExitResult) handle.diedChannel = make(chan bool) + handle.containerStatsChannel = make(chan api.ContainerStats, 5) ts.taskIdToHandle[id] = handle ts.containerIdToHandle[handle.containerID] = handle } @@ -56,14 +57,11 @@ func (ts *taskStore) Delete(id string) { delete(ts.taskIdToHandle, id) } -// keep last known containerStats in handle to -// have it available in the stats emitter +// UpdateContainerStats forwards containerStats to handle func (ts *taskStore) UpdateContainerStats(containerStats api.ContainerStats) bool { taskHandle, ok := ts.GetByContainerId(containerStats.ContainerID) if ok { - taskHandle.stateLock.Lock() - taskHandle.containerStats = containerStats - taskHandle.stateLock.Unlock() + taskHandle.containerStatsChannel <- containerStats } return ok } From f66d6dd7e0702ce1f554e297f5f29c57653c111e Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Sun, 3 Jan 2021 20:29:14 +0100 Subject: [PATCH 08/11] Replaced mutex based state store with actor based approach --- api/libpod_events.go | 1 + driver.go | 188 ++++++++++++++++++++++++++----------------- handle.go | 15 ++-- state.go | 78 ------------------ stateactor.go | 110 +++++++++++++++++++++++++ 5 files changed, 235 insertions(+), 157 deletions(-) delete mode 100644 state.go create mode 100644 stateactor.go diff --git a/api/libpod_events.go b/api/libpod_events.go index 8ace5dc7..db27a594 100644 --- a/api/libpod_events.go +++ b/api/libpod_events.go @@ -91,6 +91,7 @@ func (c *API) LibpodEventStream(ctx context.Context) (chan interface{}, error) { // no need to stop the stream, maybe we can parse the next event continue } + c.logger.Trace("libpod event", "event", line) if podmanEvent.Type == "container" { var containerEvent ContainerEvent err := json.Unmarshal([]byte(line), &containerEvent) diff --git a/driver.go b/driver.go index 2998aca1..6f23d511 100644 --- a/driver.go +++ b/driver.go @@ -75,9 +75,6 @@ type Driver struct { // nomadConfig is the client config from nomad nomadConfig *base.ClientDriverConfig - // tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles - tasks *taskStore - // ctx is the context for the driver. It is passed to other subsystems to // coordinate shutdown ctx context.Context @@ -96,6 +93,9 @@ type Driver struct { systemInfo api.Info // Queried from systemInfo: is podman running on a cgroupv2 system? cgroupV2 bool + + // state actor inbox + stateActorChannel chan interface{} } // TaskState is the state which is encoded in the handle returned in @@ -112,12 +112,27 @@ type TaskState struct { func NewPodmanDriver(logger hclog.Logger) drivers.DriverPlugin { ctx, cancel := context.WithCancel(context.Background()) return &Driver{ - eventer: eventer.NewEventer(ctx, logger), - config: &PluginConfig{}, - tasks: newTaskStore(logger), - ctx: ctx, - signalShutdown: cancel, - logger: logger.Named(pluginName), + eventer: eventer.NewEventer(ctx, logger), + config: &PluginConfig{}, + ctx: ctx, + signalShutdown: cancel, + logger: logger.Named(pluginName), + stateActorChannel: make(chan interface{}, 5), + } +} + +// NewTaskHandle creates a new TaskHandle struct +func (d *Driver) NewTaskHandle(cfg *drivers.TaskConfig) *TaskHandle { + return &TaskHandle{ + driver: d, + taskConfig: cfg, + logger: d.logger.Named("podmanHandle"), + procState: drivers.TaskStateUnknown, + startedAt: time.Now().Round(time.Millisecond), + exitResult: new(drivers.ExitResult), + diedChannel: make(chan bool), + containerStatsChannel: make(chan api.ContainerStats, 5), + removeContainerOnExit: d.config.GC.Container, } } @@ -215,8 +230,6 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { if err != nil { d.logger.Error("Could not get podman info", "err", err) } else { - // yay! we can enable the driver - health = drivers.HealthStateHealthy desc = "ready" attrs["driver.podman"] = pstructs.NewBoolAttribute(true) attrs["driver.podman.version"] = pstructs.NewStringAttribute(info.Version.Version) @@ -227,16 +240,8 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { // it is used to toggle cgroup v1/v2, rootless/rootful behavior d.systemInfo = info d.cgroupV2 = info.Host.CGroupsVersion == "v2" - err = d.runStatsStreamer() - if err != nil { - d.logger.Error("Could not start stats stream", "err", err) - health = drivers.HealthStateUnhealthy - } - err = d.runEventStreamer() - if err != nil { - d.logger.Error("Could not start event stream", "err", err) - health = drivers.HealthStateUnhealthy - } + // run some final initialization after first podman contact + health = d.onInit() } } @@ -247,6 +252,28 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } +// onInit is called after first successful podman api request. +func (d *Driver) onInit() drivers.HealthState { + var err error + + err = d.runStatsStreamer() + if err != nil { + d.logger.Error("Could not start stats stream", "err", err) + return drivers.HealthStateHealthy + } + err = d.runEventStreamer() + if err != nil { + d.logger.Error("Could not start event stream", "err", err) + return drivers.HealthStateHealthy + } + err = runStateActor(d.ctx, d.stateActorChannel, d.logger.Named("stateActor")) + if err != nil { + return drivers.HealthStateUnhealthy + } + // yay! we can enable the driver + return drivers.HealthStateHealthy +} + // stream stats from a global podman listener into task handles func (d *Driver) runStatsStreamer() error { var err error @@ -274,9 +301,7 @@ func (d *Driver) runStatsStreamer() error { d.logger.Debug("Rerun stats stream") continue } - if !d.tasks.UpdateContainerStats(containerStats) { - d.logger.Debug("Ignore stats for unknown container", "container", containerStats.Name) - } + d.stateActorChannel <- containerStats } } }() @@ -311,7 +336,7 @@ func (d *Driver) runEventStreamer() error { d.logger.Debug("Rerun event stream") continue } - d.tasks.HandleLibpodEvent(event) + d.stateActorChannel <- event } } }() @@ -329,7 +354,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("error: handle cannot be nil") } - if _, ok := d.tasks.Get(handle.Config.ID); ok { + if _, err := d.GetTaskHandle(handle.Config.ID); err == drivers.ErrTaskNotFound { return nil } @@ -345,16 +370,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return nil } - h := &TaskHandle{ - containerID: taskState.ContainerID, - driver: d, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateUnknown, - startedAt: taskState.StartedAt, - logger: d.logger.Named("podmanHandle"), - - removeContainerOnExit: d.config.GC.Container, - } + h := d.NewTaskHandle(taskState.TaskConfig) + h.containerID = taskState.ContainerID + h.startedAt = taskState.StartedAt if inspectData.State.Running { d.logger.Info("Recovered a still running container", "container", inspectData.State.Pid) @@ -381,8 +399,14 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { d.logger.Warn("Recovery restart failed, unknown container state", "state", inspectData.State.Status, "container", taskState.ContainerID) h.procState = drivers.TaskStateUnknown } - - d.tasks.Set(taskState.TaskConfig.ID, h) + msg := TaskStartedMsg{ + TaskID: taskState.TaskConfig.ID, + TaskHandle: h, + Done: make(chan bool), + } + d.stateActorChannel <- msg + // wait until actor processed the request + <-msg.Done d.logger.Debug("Recovered container handle", "container", taskState.ContainerID) @@ -396,7 +420,8 @@ func BuildContainerName(cfg *drivers.TaskConfig) string { // StartTask creates and starts a new Container based on the given TaskConfig. func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { - if _, ok := d.tasks.Get(cfg.ID); ok { + + if _, err := d.GetTaskHandle(cfg.ID); err != drivers.ErrTaskNotFound { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } @@ -568,24 +593,25 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive containerID = createResponse.Id } - h := &TaskHandle{ - containerID: containerID, - driver: d, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger.Named("podmanHandle"), + h := d.NewTaskHandle(cfg) + h.containerID = containerID + h.procState = drivers.TaskStateRunning - removeContainerOnExit: d.config.GC.Container, + msg := TaskStartedMsg{ + TaskID: cfg.ID, + TaskHandle: h, + Done: make(chan bool), } - d.tasks.Set(cfg.ID, h) + d.stateActorChannel <- msg + // wait until actor processed the request + <-msg.Done cleanup := func() { d.logger.Debug("Cleaning up", "container", containerID) if err := d.podman.ContainerDelete(d.ctx, containerID, true, true); err != nil { d.logger.Error("failed to clean up from an error in Start", "error", err) } - d.tasks.Delete(cfg.ID) + d.stateActorChannel <- TaskDeletedMsg{cfg.ID} } if !recoverRunningContainer { @@ -657,12 +683,13 @@ func memoryInBytes(strmem string) (int64, error) { // If WaitTask is called after DestroyTask, it should return drivers.ErrTaskNotFound as no task state should exist after DestroyTask is called. func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { d.logger.Debug("WaitTask called", "task", taskID) - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } ch := make(chan *drivers.ExitResult) - go handle.runExitWatcher(ctx, ch) + // forward the request into the taskHandle + d.stateActorChannel <- WaitTaskMsg{ + TaskID: taskID, + Ctx: ctx, + ExitResultChannel: ch, + } return ch, nil } @@ -671,12 +698,12 @@ func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.E // StopTask does not clean up resources of the task or remove it from the driver's internal state. func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { d.logger.Info("Stopping task", "taskID", taskID, "signal", signal) - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return err } // fixme send proper signal to container - err := d.podman.ContainerStop(d.ctx, handle.containerID, int(timeout.Seconds())) + err = d.podman.ContainerStop(d.ctx, handle.containerID, int(timeout.Seconds())) if err != nil { d.logger.Error("Could not stop/kill container", "containerID", handle.containerID, "err", err) return err @@ -688,9 +715,9 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e // If force is set to true, the driver must destroy the task even if it is still running. func (d *Driver) DestroyTask(taskID string, force bool) error { d.logger.Info("Destroy task", "taskID", taskID) - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return err } if handle.isRunning() && !force { @@ -722,17 +749,16 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { d.logger.Warn("Could not remove container", "container", handle.containerID, "error", err) } } - - d.tasks.Delete(taskID) + d.stateActorChannel <- TaskDeletedMsg{taskID} return nil } // InspectTask function returns detailed status information for the referenced taskID. func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { d.logger.Debug("InspectTask called") - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return nil, err } return handle.taskStatus(), nil @@ -742,13 +768,13 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { // The driver must send stats at the given interval until the given context is canceled or the task terminates. func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { d.logger.Debug("TaskStats called", "taskID", taskID) - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + taskResourceChannel := make(chan *drivers.TaskResourceUsage) + d.stateActorChannel <- StartStatesEmitterMsg{ + TaskID: taskID, + Interval: interval, + TaskResourceChannel: taskResourceChannel, } - statsChannel := make(chan *drivers.TaskResourceUsage) - go handle.runStatsEmitter(ctx, statsChannel, interval) - return statsChannel, nil + return taskResourceChannel, nil } // TaskEvents function allows the driver to publish driver specific events about tasks and @@ -760,8 +786,8 @@ func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, err // SignalTask function is used by drivers which support sending OS signals (SIGHUP, SIGKILL, SIGUSR1 etc.) to the task. // It is an optional function and is listed as a capability in the driver Capabilities struct. func (d *Driver) SignalTask(taskID string, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { + handle, err := d.GetTaskHandle(taskID) + if err != nil { return drivers.ErrTaskNotFound } @@ -906,6 +932,20 @@ func (d *Driver) portMappings(taskCfg *drivers.TaskConfig, driverCfg TaskConfig) return publishedPorts, nil } +// Get a TaskHandle from the state store +func (d *Driver) GetTaskHandle(id string) (*TaskHandle, error) { + msg := GetTaskHandleMsg{ + TaskID: id, + Result: make(chan *TaskHandle), + } + d.stateActorChannel <- msg + handle, ok := <-msg.Result + if !ok { + return nil, drivers.ErrTaskNotFound + } + return handle, nil +} + // expandPath returns the absolute path of dir, relative to base if dir is relative path. // base is expected to be an absolute path func expandPath(base, dir string) string { diff --git a/handle.go b/handle.go index 16ea2188..a23a536b 100644 --- a/handle.go +++ b/handle.go @@ -28,6 +28,8 @@ type TaskHandle struct { // receive container stats from global podman stats streamer containerStatsChannel chan api.ContainerStats + statsEmitterRunning bool + // stateLock syncs access to all fields below stateLock sync.RWMutex @@ -91,7 +93,7 @@ func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drive } } -func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *drivers.TaskResourceUsage, interval time.Duration) { +func (h *TaskHandle) runStatsEmitter(ctx context.Context, taskResourceChannel chan *drivers.TaskResourceUsage, interval time.Duration) { timer := time.NewTimer(0) containerStats := api.ContainerStats{} @@ -106,7 +108,10 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri h.logger.Debug("Stopping statsEmitter", "container", h.containerID) return - case containerStats = <-h.containerStatsChannel: + case s := <-h.containerStatsChannel: + // keep latest known container stats in this go routine + // and convert/emit it to nomad based on interval + containerStats = s continue case <-timer.C: @@ -129,7 +134,7 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri } // send stats to nomad - statsChannel <- &drivers.TaskResourceUsage{ + taskResourceChannel <- &drivers.TaskResourceUsage{ ResourceUsage: &drivers.ResourceUsage{ CpuStats: cs, MemoryStats: ms, @@ -140,8 +145,8 @@ func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *dri } } -func (h *TaskHandle) onContainerDied(event api.ContainerDiedEvent) { - h.logger.Debug("Container is not running anymore", "event", event) +func (h *TaskHandle) onContainerDied() { + h.logger.Debug("Container is not running anymore") // container was stopped, get exit code and other post mortem infos inspectData, err := h.driver.podman.ContainerInspect(h.driver.ctx, h.containerID) h.stateLock.Lock() diff --git a/state.go b/state.go deleted file mode 100644 index a4a1ed5e..00000000 --- a/state.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "sync" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad-driver-podman/api" - "github.com/hashicorp/nomad/plugins/drivers" -) - -type taskStore struct { - taskIdToHandle map[string]*TaskHandle - containerIdToHandle map[string]*TaskHandle - lock sync.RWMutex - logger hclog.Logger -} - -func newTaskStore(logger hclog.Logger) *taskStore { - return &taskStore{ - logger: logger, - taskIdToHandle: map[string]*TaskHandle{}, - containerIdToHandle: map[string]*TaskHandle{}, - } -} - -func (ts *taskStore) Set(id string, handle *TaskHandle) { - ts.lock.Lock() - defer ts.lock.Unlock() - handle.exitResult = new(drivers.ExitResult) - handle.diedChannel = make(chan bool) - handle.containerStatsChannel = make(chan api.ContainerStats, 5) - ts.taskIdToHandle[id] = handle - ts.containerIdToHandle[handle.containerID] = handle -} - -func (ts *taskStore) Get(id string) (*TaskHandle, bool) { - ts.lock.RLock() - defer ts.lock.RUnlock() - t, ok := ts.taskIdToHandle[id] - return t, ok -} - -func (ts *taskStore) GetByContainerId(containerID string) (*TaskHandle, bool) { - ts.lock.RLock() - defer ts.lock.RUnlock() - t, ok := ts.containerIdToHandle[containerID] - return t, ok -} - -func (ts *taskStore) Delete(id string) { - ts.lock.Lock() - defer ts.lock.Unlock() - t, ok := ts.taskIdToHandle[id] - if ok { - delete(ts.containerIdToHandle, t.containerID) - } - delete(ts.taskIdToHandle, id) -} - -// UpdateContainerStats forwards containerStats to handle -func (ts *taskStore) UpdateContainerStats(containerStats api.ContainerStats) bool { - taskHandle, ok := ts.GetByContainerId(containerStats.ContainerID) - if ok { - taskHandle.containerStatsChannel <- containerStats - } - return ok -} - -// Manage task handle state by consuming libpod events -func (ts *taskStore) HandleLibpodEvent(e interface{}) { - switch event := e.(type) { - case api.ContainerDiedEvent: - taskHandle, ok := ts.GetByContainerId(event.ID) - if ok { - taskHandle.onContainerDied(event) - } - } -} diff --git a/stateactor.go b/stateactor.go new file mode 100644 index 00000000..fae16bef --- /dev/null +++ b/stateactor.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-driver-podman/api" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type GetTaskHandleMsg struct { + TaskID string + Result chan *TaskHandle +} + +type TaskStartedMsg struct { + TaskID string + TaskHandle *TaskHandle + Done chan bool +} + +type TaskDeletedMsg struct { + TaskID string +} + +type StartStatesEmitterMsg struct { + TaskID string + Interval time.Duration + TaskResourceChannel chan *drivers.TaskResourceUsage +} + +type WaitTaskMsg struct { + TaskID string + Ctx context.Context + ExitResultChannel chan *drivers.ExitResult +} + +// the state actor serializes requests to the TaskHandle by using a message channel. +// it also holds taskid->handle and containerid->handle maps +// and routes incoming podman stats and podman events to TaskHandles +func runStateActor(ctx context.Context, actorMessageChannel <-chan interface{}, logger hclog.Logger) error { + + taskIdToHandle := map[string]*TaskHandle{} + containerIdToHandle := map[string]*TaskHandle{} + + go func() { + logger.Debug("Starting state actor") + + for m := range actorMessageChannel { + switch message := m.(type) { + + case GetTaskHandleMsg: + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + message.Result <- taskHandle + } else { + close(message.Result) + } + + case TaskStartedMsg: + logger.Trace("on started", "message", message) + taskIdToHandle[message.TaskID] = message.TaskHandle + containerIdToHandle[message.TaskHandle.containerID] = message.TaskHandle + // unblock caller + message.Done <- true + + case WaitTaskMsg: + logger.Trace("on wait task", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + go taskHandle.runExitWatcher(message.Ctx, message.ExitResultChannel) + } + + case StartStatesEmitterMsg: + logger.Trace("on start emitter", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + go taskHandle.runStatsEmitter(ctx, message.TaskResourceChannel, message.Interval) + taskHandle.statsEmitterRunning = true + } + + case api.ContainerStats: + taskHandle, ok := containerIdToHandle[message.ContainerID] + // avoid to block the actor if the target stats emitter is not running + if ok && taskHandle.statsEmitterRunning { + taskHandle.containerStatsChannel <- message + } + + case api.ContainerDiedEvent: + logger.Trace("on died", "message", message) + taskHandle, ok := containerIdToHandle[message.ID] + if ok { + go taskHandle.onContainerDied() + } + + case TaskDeletedMsg: + logger.Trace("on deleted", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + delete(taskIdToHandle, message.TaskID) + delete(containerIdToHandle, taskHandle.containerID) + } + + } + } + logger.Debug("State actor stopped") + }() + return nil +} From 4762b922ddadecea8a3ea224632b034594dc820f Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Sun, 3 Jan 2021 20:39:08 +0100 Subject: [PATCH 09/11] Linter --- handle.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/handle.go b/handle.go index a23a536b..2d16b712 100644 --- a/handle.go +++ b/handle.go @@ -28,8 +28,6 @@ type TaskHandle struct { // receive container stats from global podman stats streamer containerStatsChannel chan api.ContainerStats - statsEmitterRunning bool - // stateLock syncs access to all fields below stateLock sync.RWMutex @@ -40,6 +38,7 @@ type TaskHandle struct { exitResult *drivers.ExitResult removeContainerOnExit bool + statsEmitterRunning bool } func (h *TaskHandle) taskStatus() *drivers.TaskStatus { From ea84f905779fbd073874523b9f88a2117f35f2eb Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Mon, 18 Jan 2021 19:55:38 +0100 Subject: [PATCH 10/11] Fixed critical fingerprint problem --- driver.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/driver.go b/driver.go index 6f23d511..77873574 100644 --- a/driver.go +++ b/driver.go @@ -93,6 +93,7 @@ type Driver struct { systemInfo api.Info // Queried from systemInfo: is podman running on a cgroupv2 system? cgroupV2 bool + health drivers.HealthState // state actor inbox stateActorChannel chan interface{} @@ -206,8 +207,10 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Finge for { select { case <-ctx.Done(): + d.logger.Info("Fingerprint context is done") return case <-d.ctx.Done(): + d.logger.Info("Driver context is done") return case <-ticker.C: ticker.Reset(fingerprintPeriod) @@ -217,18 +220,17 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Finge } func (d *Driver) buildFingerprint() *drivers.Fingerprint { - var health drivers.HealthState var desc string attrs := map[string]*pstructs.Attribute{} // be negative and guess that we will not be able to get a podman connection - health = drivers.HealthStateUndetected desc = "disabled" // try to connect and get version info info, err := d.podman.SystemInfo(d.ctx) if err != nil { d.logger.Error("Could not get podman info", "err", err) + d.health = drivers.HealthStateUndetected } else { desc = "ready" attrs["driver.podman"] = pstructs.NewBoolAttribute(true) @@ -236,18 +238,19 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { attrs["driver.podman.rootless"] = pstructs.NewBoolAttribute(info.Host.Rootless) attrs["driver.podman.cgroupVersion"] = pstructs.NewStringAttribute(info.Host.CGroupsVersion) if d.systemInfo.Version.Version == "" { + d.logger.Info("Initializing podman streams") // keep first received systemInfo in driver struct // it is used to toggle cgroup v1/v2, rootless/rootful behavior d.systemInfo = info d.cgroupV2 = info.Host.CGroupsVersion == "v2" // run some final initialization after first podman contact - health = d.onInit() + d.health = d.onInit() } } return &drivers.Fingerprint{ Attributes: attrs, - Health: health, + Health: d.health, HealthDescription: desc, } } From a8cc19db8f0e226fee7adc73b6127abada0d5cea Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Thu, 28 Jan 2021 20:04:16 +0100 Subject: [PATCH 11/11] Fixed log --- api/container_stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/container_stats.go b/api/container_stats.go index 72e1aba7..061344cf 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -82,7 +82,7 @@ func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, er } var statsReport ContainerStatsReport if jerr := json.Unmarshal([]byte(line), &statsReport); jerr != nil { - c.logger.Error("Unable to unmarshal statsreport", "err", err) + c.logger.Error("Unable to unmarshal statsreport", "err", jerr) return } if statsReport.Error != nil {