diff --git a/api/api.go b/api/api.go index c700ca89..71e328de 100644 --- a/api/api.go +++ b/api/api.go @@ -16,9 +16,10 @@ import ( ) type API struct { - baseUrl string - httpClient *http.Client - logger hclog.Logger + baseUrl string + httpClient *http.Client + httpStreamClient *http.Client + logger hclog.Logger } type ClientConfig struct { @@ -56,6 +57,8 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API { ac.httpClient = &http.Client{ Timeout: config.HttpTimeout, } + // we do not want a timeout for streaming requests. + ac.httpStreamClient = &http.Client{} if strings.HasPrefix(baseUrl, "unix:") { ac.baseUrl = "http://u" path := strings.TrimPrefix(baseUrl, "unix:") @@ -64,6 +67,7 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API { return net.Dial("unix", path) }, } + ac.httpStreamClient.Transport = ac.httpClient.Transport } else { ac.baseUrl = baseUrl } @@ -84,6 +88,14 @@ func (c *API) Get(ctx context.Context, path string) (*http.Response, error) { return c.Do(req) } +func (c *API) GetStream(ctx context.Context, path string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.baseUrl+path, nil) + if err != nil { + return nil, err + } + return c.httpStreamClient.Do(req) +} + func (c *API) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) { return c.PostWithHeaders(ctx, path, body, map[string]string{}) } diff --git a/api/container_start.go b/api/container_start.go index 71d6f886..8234c34a 100644 --- a/api/container_start.go +++ b/api/container_start.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "net/http" - "time" ) // ContainerStart starts a container via id or name @@ -23,11 +22,5 @@ func (c *API) ContainerStart(ctx context.Context, name string) error { return fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body) } - // wait max 10 seconds for running state - // TODO: make timeout configurable - timeout, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - err = c.ContainerWait(timeout, name, []string{"running", "exited"}) return err } diff --git a/api/container_stats.go b/api/container_stats.go index f50b14c8..061344cf 100644 --- a/api/container_stats.go +++ b/api/container_stats.go @@ -7,6 +7,8 @@ import ( "fmt" "io/ioutil" "net/http" + + "github.com/mitchellh/go-linereader" ) var ContainerNotFound = errors.New("No such Container") @@ -45,3 +47,54 @@ func (c *API) ContainerStats(ctx context.Context, name string) (Stats, error) { return stats, nil } + +// ContainerStatsStream streams stats for all containers +func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, error) { + + res, err := c.GetStream(ctx, "/v1.0.0/libpod/containers/stats?stream=true") + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode) + } + + statsChannel := make(chan ContainerStats, 5) + lr := linereader.New(res.Body) + + go func() { + c.logger.Debug("Running stats stream") + defer func() { + res.Body.Close() + close(statsChannel) + c.logger.Debug("Stopped stats stream") + }() + for { + select { + case <-ctx.Done(): + c.logger.Debug("Stopping stats stream") + return + case line, ok := <-lr.Ch: + if !ok { + c.logger.Debug("Stats reader channel was closed") + return + } + var statsReport ContainerStatsReport + if jerr := json.Unmarshal([]byte(line), &statsReport); jerr != nil { + c.logger.Error("Unable to unmarshal statsreport", "err", jerr) + return + } + if statsReport.Error != nil { + c.logger.Error("Stats stream is broken", "error", statsReport.Error) + return + } + for _, stat := range statsReport.Stats { + statsChannel <- stat + } + } + } + }() + + return statsChannel, nil +} diff --git a/api/libpod_events.go b/api/libpod_events.go new file mode 100644 index 00000000..db27a594 --- /dev/null +++ b/api/libpod_events.go @@ -0,0 +1,136 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/mitchellh/go-linereader" +) + +// PodmanEvent is the common header for all events +type PodmanEvent struct { + Type string + Action string +} + +// ContainerEvent is a generic PodmanEvent for a single container +// example json: +// {"Type":"container","Action":"create","Actor":{"ID":"cc0d7849692360df2cba94eafb2715b9deec0cbd96ec41c3329dd8636cd070ce","Attributes":{"containerExitCode":"0","image":"docker.io/library/redis:latest","name":"redis-6f2b07a8-73e9-7098-83e1-55939851d46d"}},"scope":"local","time":1609413164,"timeNano":1609413164982188073} +type ContainerEvent struct { + // create/init/start/stop/died + Action string `json:"Action"` + Scope string `json:"scope"` + TimeNano uint64 `json:"timeNano"` + Time uint32 `json:"time"` + Actor ContainerEventActor `json:"Actor"` +} + +type ContainerEventActor struct { + ID string `json:"ID"` + Attributes ContainerEventAttributes `json:"Attributes"` +} + +type ContainerEventAttributes struct { + Image string `json:"image"` + Name string `json:"name"` + ContainerExitCode string `json:"containerExitCode"` +} + +// ContainerStartEvent is emitted when a container completely started +type ContainerStartEvent struct { + ID string + Name string +} + +// ContainerDiedEvent is emitted when a container exited +type ContainerDiedEvent struct { + ID string + Name string + ExitCode int +} + +// LibpodEventStream streams podman events +func (c *API) LibpodEventStream(ctx context.Context) (chan interface{}, error) { + + res, err := c.GetStream(ctx, "/v1.0.0/libpod/events?stream=true") + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode) + } + + eventsChannel := make(chan interface{}, 5) + lr := linereader.New(res.Body) + + go func() { + c.logger.Debug("Running libpod event stream") + defer func() { + res.Body.Close() + close(eventsChannel) + c.logger.Debug("Stopped libpod event stream") + }() + for { + select { + case <-ctx.Done(): + c.logger.Debug("Stopping libpod event stream") + return + case line, ok := <-lr.Ch: + if !ok { + c.logger.Debug("Event reader channel was closed") + return + } + var podmanEvent PodmanEvent + err := json.Unmarshal([]byte(line), &podmanEvent) + if err != nil { + c.logger.Error("Unable to parse libpod event", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + c.logger.Trace("libpod event", "event", line) + if podmanEvent.Type == "container" { + var containerEvent ContainerEvent + err := json.Unmarshal([]byte(line), &containerEvent) + if err != nil { + c.logger.Error("Unable to parse ContainerEvent", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + switch containerEvent.Action { + case "start": + eventsChannel <- ContainerStartEvent{ + ID: containerEvent.Actor.ID, + Name: containerEvent.Actor.Attributes.Name, + } + continue + case "died": + i, err := strconv.Atoi(containerEvent.Actor.Attributes.ContainerExitCode) + if err != nil { + c.logger.Error("Unable to parse ContainerEvent exitCode", "error", err) + // no need to stop the stream, maybe we can parse the next event + continue + } + eventsChannel <- ContainerDiedEvent{ + ID: containerEvent.Actor.ID, + Name: containerEvent.Actor.Attributes.Name, + ExitCode: i, + } + continue + } + // no action specific parser? emit what we've got + eventsChannel <- containerEvent + continue + } + + // emit a generic event if we do not have a parser for it + eventsChannel <- podmanEvent + } + } + }() + + return eventsChannel, nil +} diff --git a/api/structs.go b/api/structs.go index 694a475b..e6085a21 100644 --- a/api/structs.go +++ b/api/structs.go @@ -1535,3 +1535,39 @@ type Version struct { Built int64 OsArch string } + +// ------------------------------------------------------------------------------------------------------- +// structs copied from https://github.com/containers/podman/blob/master/libpod/define/containerstate.go +// +// some unused parts are modified/commented out to not pull more dependencies +// +// some fields are reordert to make the linter happy (bytes maligned complains) +// ------------------------------------------------------------------------------------------------------- + +// ContainerStats contains the statistics information for a running container +type ContainerStats struct { + ContainerID string + Name string + PerCPU []uint64 + CPU float64 + CPUNano uint64 + CPUSystemNano uint64 + SystemNano uint64 + MemUsage uint64 + MemLimit uint64 + MemPerc float64 + NetInput uint64 + NetOutput uint64 + BlockInput uint64 + BlockOutput uint64 + PIDs uint64 +} + +// ContainerStatsReport is used for streaming container stats. +// https://github.com/containers/podman/blob/master/pkg/domain/entities/containers.go +type ContainerStatsReport struct { + // Error from reading stats. + Error error + // Results, set when there is no error. + Stats []ContainerStats +} diff --git a/driver.go b/driver.go index 8085833a..66f38f73 100644 --- a/driver.go +++ b/driver.go @@ -19,7 +19,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-driver-podman/api" "github.com/hashicorp/nomad-driver-podman/version" - "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/drivers/shared/eventer" shelpers "github.com/hashicorp/nomad/helper/stats" @@ -79,9 +78,6 @@ type Driver struct { // nomadConfig is the client config from nomad nomadConfig *base.ClientDriverConfig - // tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles - tasks *taskStore - // ctx is the context for the driver. It is passed to other subsystems to // coordinate shutdown ctx context.Context @@ -100,6 +96,10 @@ type Driver struct { systemInfo api.Info // Queried from systemInfo: is podman running on a cgroupv2 system? cgroupV2 bool + health drivers.HealthState + + // state actor inbox + stateActorChannel chan interface{} } // TaskState is the state which is encoded in the handle returned in @@ -116,12 +116,27 @@ type TaskState struct { func NewPodmanDriver(logger hclog.Logger) drivers.DriverPlugin { ctx, cancel := context.WithCancel(context.Background()) return &Driver{ - eventer: eventer.NewEventer(ctx, logger), - config: &PluginConfig{}, - tasks: newTaskStore(), - ctx: ctx, - signalShutdown: cancel, - logger: logger.Named(pluginName), + eventer: eventer.NewEventer(ctx, logger), + config: &PluginConfig{}, + ctx: ctx, + signalShutdown: cancel, + logger: logger.Named(pluginName), + stateActorChannel: make(chan interface{}, 5), + } +} + +// NewTaskHandle creates a new TaskHandle struct +func (d *Driver) NewTaskHandle(cfg *drivers.TaskConfig) *TaskHandle { + return &TaskHandle{ + driver: d, + taskConfig: cfg, + logger: d.logger.Named("podmanHandle"), + procState: drivers.TaskStateUnknown, + startedAt: time.Now().Round(time.Millisecond), + exitResult: new(drivers.ExitResult), + diedChannel: make(chan bool), + containerStatsChannel: make(chan api.ContainerStats, 5), + removeContainerOnExit: d.config.GC.Container, } } @@ -195,8 +210,10 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Finge for { select { case <-ctx.Done(): + d.logger.Info("Fingerprint context is done") return case <-d.ctx.Done(): + d.logger.Info("Driver context is done") return case <-ticker.C: ticker.Reset(fingerprintPeriod) @@ -206,41 +223,133 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Finge } func (d *Driver) buildFingerprint() *drivers.Fingerprint { - var health drivers.HealthState var desc string attrs := map[string]*pstructs.Attribute{} // be negative and guess that we will not be able to get a podman connection - health = drivers.HealthStateUndetected desc = "disabled" // try to connect and get version info info, err := d.podman.SystemInfo(d.ctx) if err != nil { - d.logger.Error("Could not get podman info", "error", err) + d.logger.Error("Could not get podman info", "err", err) + d.health = drivers.HealthStateUndetected } else { - // yay! we can enable the driver - health = drivers.HealthStateHealthy desc = "ready" attrs["driver.podman"] = pstructs.NewBoolAttribute(true) attrs["driver.podman.version"] = pstructs.NewStringAttribute(info.Version.Version) attrs["driver.podman.rootless"] = pstructs.NewBoolAttribute(info.Host.Security.Rootless) attrs["driver.podman.cgroupVersion"] = pstructs.NewStringAttribute(info.Host.CGroupsVersion) if d.systemInfo.Version.Version == "" { + d.logger.Info("Initializing podman streams") // keep first received systemInfo in driver struct // it is used to toggle cgroup v1/v2, rootless/rootful behavior d.systemInfo = info d.cgroupV2 = info.Host.CGroupsVersion == "v2" + // run some final initialization after first podman contact + d.health = d.onInit() } } return &drivers.Fingerprint{ Attributes: attrs, - Health: health, + Health: d.health, HealthDescription: desc, } } +// onInit is called after first successful podman api request. +func (d *Driver) onInit() drivers.HealthState { + var err error + + err = d.runStatsStreamer() + if err != nil { + d.logger.Error("Could not start stats stream", "err", err) + return drivers.HealthStateHealthy + } + err = d.runEventStreamer() + if err != nil { + d.logger.Error("Could not start event stream", "err", err) + return drivers.HealthStateHealthy + } + err = runStateActor(d.ctx, d.stateActorChannel, d.logger.Named("stateActor")) + if err != nil { + return drivers.HealthStateUnhealthy + } + // yay! we can enable the driver + return drivers.HealthStateHealthy +} + +// stream stats from a global podman listener into task handles +func (d *Driver) runStatsStreamer() error { + var err error + var statsChannel chan api.ContainerStats + + statsChannel, err = d.podman.ContainerStatsStream(d.ctx) + if err != nil { + return err + } + + go func() { + for { + select { + case <-d.ctx.Done(): + return + case containerStats, ok := <-statsChannel: + if !ok { + // re-run api request on http timeout/connection loss + statsChannel, err = d.podman.ContainerStatsStream(d.ctx) + if err != nil { + // throttle retries on error + d.logger.Warn("Failed to rerun stats stream api request", "err", err) + time.Sleep(time.Second * 3) + } + d.logger.Debug("Rerun stats stream") + continue + } + d.stateActorChannel <- containerStats + } + } + }() + + return nil +} + +// stream events from a global podman listener into task handles +func (d *Driver) runEventStreamer() error { + var err error + var eventsChannel chan interface{} + + eventsChannel, err = d.podman.LibpodEventStream(d.ctx) + if err != nil { + return err + } + + go func() { + for { + select { + case <-d.ctx.Done(): + return + case event, ok := <-eventsChannel: + if !ok { + // re-run api request on http timeout/connection loss + eventsChannel, err = d.podman.LibpodEventStream(d.ctx) + if err != nil { + // throttle retries on error + d.logger.Warn("Failed to rerun event stream api request", "err", err) + time.Sleep(time.Second * 3) + } + d.logger.Debug("Rerun event stream") + continue + } + d.stateActorChannel <- event + } + } + }() + + return nil +} + // RecoverTask detects running tasks when nomad client or task driver is restarted. // When a driver is restarted it is not expected to persist any internal state to disk. // To support this, Nomad will attempt to recover a task that was previously started @@ -251,7 +360,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("error: handle cannot be nil") } - if _, ok := d.tasks.Get(handle.Config.ID); ok { + if _, err := d.GetTaskHandle(handle.Config.ID); err == drivers.ErrTaskNotFound { return nil } @@ -270,21 +379,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return nil } - h := &TaskHandle{ - containerID: taskState.ContainerID, - driver: d, - taskConfig: taskState.TaskConfig, - procState: drivers.TaskStateUnknown, - startedAt: taskState.StartedAt, - exitResult: &drivers.ExitResult{}, - logger: d.logger.Named("podmanHandle"), - - totalCPUStats: stats.NewCpuStats(), - userCPUStats: stats.NewCpuStats(), - systemCPUStats: stats.NewCpuStats(), - - removeContainerOnExit: d.config.GC.Container, - } + h := d.NewTaskHandle(taskState.TaskConfig) + h.containerID = taskState.ContainerID + h.startedAt = taskState.StartedAt if inspectData.State.Running { d.logger.Info("Recovered a still running container", "container", inspectData.State.Pid) @@ -311,10 +408,15 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { d.logger.Warn("Recovery restart failed, unknown container state", "state", inspectData.State.Status, "container", taskState.ContainerID) h.procState = drivers.TaskStateUnknown } + msg := TaskStartedMsg{ + TaskID: taskState.TaskConfig.ID, + TaskHandle: h, + Done: make(chan bool), + } + d.stateActorChannel <- msg + // wait until actor processed the request + <-msg.Done - d.tasks.Set(taskState.TaskConfig.ID, h) - - go h.runContainerMonitor() d.logger.Debug("Recovered container handle", "container", taskState.ContainerID) return nil @@ -334,7 +436,7 @@ func BuildContainerNameForTask(taskName string, cfg *drivers.TaskConfig) string func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { rootless := d.systemInfo.Host.Security.Rootless - if _, ok := d.tasks.Get(cfg.ID); ok { + if _, err := d.GetTaskHandle(cfg.ID); err != drivers.ErrTaskNotFound { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } @@ -523,11 +625,25 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive containerID = createResponse.Id } + h := d.NewTaskHandle(cfg) + h.containerID = containerID + h.procState = drivers.TaskStateRunning + + msg := TaskStartedMsg{ + TaskID: cfg.ID, + TaskHandle: h, + Done: make(chan bool), + } + d.stateActorChannel <- msg + // wait until actor processed the request + <-msg.Done + cleanup := func() { d.logger.Debug("Cleaning up", "container", containerID) if err := d.podman.ContainerDelete(d.ctx, containerID, true, true); err != nil { d.logger.Error("failed to clean up from an error in Start", "error", err) } + d.stateActorChannel <- TaskDeletedMsg{cfg.ID} } if !recoverRunningContainer { @@ -550,22 +666,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive AutoAdvertise: true, } - h := &TaskHandle{ - containerID: containerID, - driver: d, - taskConfig: cfg, - procState: drivers.TaskStateRunning, - exitResult: &drivers.ExitResult{}, - startedAt: time.Now().Round(time.Millisecond), - logger: d.logger.Named("podmanHandle"), - - totalCPUStats: stats.NewCpuStats(), - userCPUStats: stats.NewCpuStats(), - systemCPUStats: stats.NewCpuStats(), - - removeContainerOnExit: d.config.GC.Container, - } - driverState := TaskState{ ContainerID: containerID, TaskConfig: cfg, @@ -579,10 +679,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } - d.tasks.Set(cfg.ID, h) - - go h.runContainerMonitor() - d.logger.Info("Completely started container", "taskID", cfg.ID, "container", containerID, "ip", inspectData.NetworkSettings.IPAddress) return handle, net, nil @@ -729,12 +825,13 @@ func parseImage(image string) (types.ImageReference, error) { // If WaitTask is called after DestroyTask, it should return drivers.ErrTaskNotFound as no task state should exist after DestroyTask is called. func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { d.logger.Debug("WaitTask called", "task", taskID) - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound - } ch := make(chan *drivers.ExitResult) - go handle.runExitWatcher(ctx, ch) + // forward the request into the taskHandle + d.stateActorChannel <- WaitTaskMsg{ + TaskID: taskID, + Ctx: ctx, + ExitResultChannel: ch, + } return ch, nil } @@ -743,13 +840,13 @@ func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.E // StopTask does not clean up resources of the task or remove it from the driver's internal state. func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { d.logger.Info("Stopping task", "taskID", taskID, "signal", signal) - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return err } // fixme send proper signal to container - err := d.podman.ContainerStop(d.ctx, handle.containerID, int(timeout.Seconds()), true) + err = d.podman.ContainerStop(d.ctx, handle.containerID, int(timeout.Seconds()), true) if err == nil { return nil } else if err == api.ContainerNotFound { @@ -764,9 +861,10 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e // DestroyTask function cleans up and removes a task that has terminated. // If force is set to true, the driver must destroy the task even if it is still running. func (d *Driver) DestroyTask(taskID string, force bool) error { - handle, ok := d.tasks.Get(taskID) - if !ok { - return drivers.ErrTaskNotFound + d.logger.Info("Destroy task", "taskID", taskID) + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return err } if handle.isRunning() && !force { @@ -798,17 +896,16 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { d.logger.Warn("Could not remove container", "container", handle.containerID, "error", err) } } - - d.tasks.Delete(taskID) + d.stateActorChannel <- TaskDeletedMsg{taskID} return nil } // InspectTask function returns detailed status information for the referenced taskID. func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { d.logger.Debug("InspectTask called") - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return nil, err } return handle.taskStatus(), nil @@ -818,13 +915,13 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { // The driver must send stats at the given interval until the given context is canceled or the task terminates. func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { d.logger.Debug("TaskStats called", "taskID", taskID) - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + taskResourceChannel := make(chan *drivers.TaskResourceUsage) + d.stateActorChannel <- StartStatesEmitterMsg{ + TaskID: taskID, + Interval: interval, + TaskResourceChannel: taskResourceChannel, } - statsChannel := make(chan *drivers.TaskResourceUsage) - go handle.runStatsEmitter(ctx, statsChannel, interval) - return statsChannel, nil + return taskResourceChannel, nil } // TaskEvents function allows the driver to publish driver specific events about tasks and @@ -836,8 +933,8 @@ func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, err // SignalTask function is used by drivers which support sending OS signals (SIGHUP, SIGKILL, SIGUSR1 etc.) to the task. // It is an optional function and is listed as a capability in the driver Capabilities struct. func (d *Driver) SignalTask(taskID string, signal string) error { - handle, ok := d.tasks.Get(taskID) - if !ok { + handle, err := d.GetTaskHandle(taskID) + if err != nil { return drivers.ErrTaskNotFound } @@ -846,9 +943,9 @@ func (d *Driver) SignalTask(taskID string, signal string) error { // ExecTask function is used by the Nomad client to execute scripted health checks inside the task execution context. func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return nil, err } createRequest := api.ExecConfig{ Command: cmd, @@ -908,9 +1005,9 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (* // ExecTask function is used by the Nomad client to execute commands inside the task execution context. // i.E. nomad alloc exec .... func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, execOptions *drivers.ExecOptions) (*drivers.ExitResult, error) { - handle, ok := d.tasks.Get(taskID) - if !ok { - return nil, drivers.ErrTaskNotFound + handle, err := d.GetTaskHandle(taskID) + if err != nil { + return nil, err } createRequest := api.ExecConfig{ @@ -1088,6 +1185,20 @@ func (d *Driver) portMappings(taskCfg *drivers.TaskConfig, driverCfg TaskConfig) return publishedPorts, nil } +// Get a TaskHandle from the state store +func (d *Driver) GetTaskHandle(id string) (*TaskHandle, error) { + msg := GetTaskHandleMsg{ + TaskID: id, + Result: make(chan *TaskHandle), + } + d.stateActorChannel <- msg + handle, ok := <-msg.Result + if !ok { + return nil, drivers.ErrTaskNotFound + } + return handle, nil +} + // expandPath returns the absolute path of dir, relative to base if dir is relative path. // base is expected to be an absolute path func expandPath(base, dir string) string { diff --git a/go.mod b/go.mod index c7805916..aeae2878 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/hashicorp/nomad v1.0.4-0.20210415141937-ee9bb3cc4f40 github.com/hashicorp/nomad/api v0.0.0-20210415141937-ee9bb3cc4f40 github.com/mattn/go-colorable v0.1.7 // indirect + github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb github.com/mitchellh/go-ps v1.0.0 // indirect github.com/onsi/ginkgo v1.13.0 // indirect github.com/opencontainers/runtime-spec v1.0.3-0.20200929063507-e6143ca7d51d diff --git a/go.sum b/go.sum index 3d54b336..1d133a1a 100644 --- a/go.sum +++ b/go.sum @@ -729,6 +729,8 @@ github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFW github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb h1:GRiLv4rgyqjqzxbhJke65IYUf4NCOOvrPOJbV/sPxkM= +github.com/mitchellh/go-linereader v0.0.0-20190213213312-1b945b3263eb/go.mod h1:OaY7UOoTkkrX3wRwjpYRKafIkkyeD0UtweSHAWWiqQM= github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= diff --git a/handle.go b/handle.go index 58fe61c4..e46507cd 100644 --- a/handle.go +++ b/handle.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "sync" "time" @@ -15,7 +14,7 @@ import ( var ( measuredCPUStats = []string{"System Mode", "User Mode", "Percent"} - measuredMemStats = []string{"Usage", "Max Usage"} + measuredMemStats = []string{"Usage"} ) // TaskHandle is the podman specific handle for exactly one container @@ -24,9 +23,10 @@ type TaskHandle struct { logger hclog.Logger driver *Driver - totalCPUStats *stats.CpuStats - userCPUStats *stats.CpuStats - systemCPUStats *stats.CpuStats + diedChannel chan bool + + // receive container stats from global podman stats streamer + containerStatsChannel chan api.ContainerStats // stateLock syncs access to all fields below stateLock sync.RWMutex @@ -38,8 +38,7 @@ type TaskHandle struct { exitResult *drivers.ExitResult removeContainerOnExit bool - - containerStats api.Stats + statsEmitterRunning bool } func (h *TaskHandle) taskStatus() *drivers.TaskStatus { @@ -66,7 +65,6 @@ func (h *TaskHandle) isRunning() bool { } func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drivers.ExitResult) { - timer := time.NewTimer(0) h.logger.Debug("Starting exitWatcher", "container", h.containerID) defer func() { @@ -79,131 +77,99 @@ func (h *TaskHandle) runExitWatcher(ctx context.Context, exitChannel chan *drive close(exitChannel) }() + if !h.isRunning() { + h.logger.Debug("No need to run exitWatcher on a stopped container") + return + } + for { - if !h.isRunning() { - return - } select { case <-ctx.Done(): return - case <-timer.C: - timer.Reset(time.Second) + case <-h.diedChannel: + return } } } -func (h *TaskHandle) runStatsEmitter(ctx context.Context, statsChannel chan *drivers.TaskResourceUsage, interval time.Duration) { +func (h *TaskHandle) runStatsEmitter(ctx context.Context, taskResourceChannel chan *drivers.TaskResourceUsage, interval time.Duration) { timer := time.NewTimer(0) + + containerStats := api.ContainerStats{} + userCPUStats := stats.NewCpuStats() + systemCPUStats := stats.NewCpuStats() + h.logger.Debug("Starting statsEmitter", "container", h.containerID) for { select { + case <-ctx.Done(): h.logger.Debug("Stopping statsEmitter", "container", h.containerID) return + + case s := <-h.containerStatsChannel: + // keep latest known container stats in this go routine + // and convert/emit it to nomad based on interval + containerStats = s + continue + case <-timer.C: timer.Reset(interval) - } - h.stateLock.Lock() - t := time.Now() - - //FIXME implement cpu stats correctly - totalPercent := h.totalCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.TotalUsage)) - cs := &drivers.CpuStats{ - SystemMode: h.systemCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.UsageInKernelmode)), - UserMode: h.userCPUStats.Percent(float64(h.containerStats.CPUStats.CPUUsage.UsageInUsermode)), - Percent: totalPercent, - TotalTicks: h.systemCPUStats.TicksConsumed(totalPercent), - Measured: measuredCPUStats, - } + totalPercent := containerStats.CPU - ms := &drivers.MemoryStats{ - MaxUsage: h.containerStats.MemoryStats.MaxUsage, - Usage: h.containerStats.MemoryStats.Usage, - RSS: h.containerStats.MemoryStats.Usage, - Measured: measuredMemStats, - } - h.stateLock.Unlock() + cs := &drivers.CpuStats{ + SystemMode: systemCPUStats.Percent(float64(containerStats.CPUSystemNano)), + UserMode: userCPUStats.Percent(float64(containerStats.CPUNano)), + Percent: totalPercent, + TotalTicks: systemCPUStats.TicksConsumed(totalPercent), + Measured: measuredCPUStats, + } - // update uasge - usage := drivers.TaskResourceUsage{ - ResourceUsage: &drivers.ResourceUsage{ - CpuStats: cs, - MemoryStats: ms, - }, - Timestamp: t.UTC().UnixNano(), + ms := &drivers.MemoryStats{ + Usage: containerStats.MemUsage, + RSS: containerStats.MemUsage, + Measured: measuredMemStats, + } + + // send stats to nomad + taskResourceChannel <- &drivers.TaskResourceUsage{ + ResourceUsage: &drivers.ResourceUsage{ + CpuStats: cs, + MemoryStats: ms, + }, + Timestamp: time.Now().UTC().UnixNano(), + } } - // send stats to nomad - statsChannel <- &usage } } -func (h *TaskHandle) runContainerMonitor() { - - timer := time.NewTimer(0) - interval := time.Second * 1 - h.logger.Debug("Monitoring container", "container", h.containerID) - - cleanup := func() { - h.logger.Debug("Container monitor exits", "container", h.containerID) - } - defer cleanup() - - for { - select { - case <-h.driver.ctx.Done(): - return - - case <-timer.C: - timer.Reset(interval) +func (h *TaskHandle) onContainerDied() { + h.logger.Debug("Container is not running anymore") + // container was stopped, get exit code and other post mortem infos + inspectData, err := h.driver.podman.ContainerInspect(h.driver.ctx, h.containerID) + h.stateLock.Lock() + h.completedAt = time.Now() + if err != nil { + h.exitResult.Err = fmt.Errorf("Driver was unable to get the exit code. %s: %v", h.containerID, err) + h.logger.Error("Failed to inspect stopped container, can not get exit code", "container", h.containerID, "error", err) + h.exitResult.Signal = 0 + } else { + h.exitResult.ExitCode = int(inspectData.State.ExitCode) + if len(inspectData.State.Error) > 0 { + h.exitResult.Err = fmt.Errorf(inspectData.State.Error) + h.logger.Error("Container error", "container", h.containerID, "error", h.exitResult.Err) } - - containerStats, statsErr := h.driver.podman.ContainerStats(h.driver.ctx, h.containerID) - if statsErr != nil { - gone := false - if errors.Is(statsErr, api.ContainerNotFound) { - gone = true - } else if errors.Is(statsErr, api.ContainerWrongState) { - gone = true - } - if gone { - h.logger.Debug("Container is not running anymore", "container", h.containerID, "error", statsErr) - // container was stopped, get exit code and other post mortem infos - inspectData, err := h.driver.podman.ContainerInspect(h.driver.ctx, h.containerID) - h.stateLock.Lock() - h.completedAt = time.Now() - if err != nil { - h.exitResult.Err = fmt.Errorf("Driver was unable to get the exit code. %s: %v", h.containerID, err) - h.logger.Error("Failed to inspect stopped container, can not get exit code", "container", h.containerID, "error", err) - h.exitResult.Signal = 0 - } else { - h.exitResult.ExitCode = int(inspectData.State.ExitCode) - if len(inspectData.State.Error) > 0 { - h.exitResult.Err = fmt.Errorf(inspectData.State.Error) - h.logger.Error("Container error", "container", h.containerID, "error", h.exitResult.Err) - } - h.completedAt = inspectData.State.FinishedAt - if inspectData.State.OOMKilled { - h.exitResult.OOMKilled = true - h.exitResult.Err = fmt.Errorf("Podman container killed by OOM killer") - h.logger.Error("Podman container killed by OOM killer", "container", h.containerID) - } - } - - h.procState = drivers.TaskStateExited - h.stateLock.Unlock() - return - } - // continue and wait for next cycle, it should eventually - // fall into the "TaskStateExited" case - h.logger.Debug("Could not get container stats, unknown error", "error", fmt.Sprintf("%#v", statsErr)) - continue + h.completedAt = inspectData.State.FinishedAt + if inspectData.State.OOMKilled { + h.exitResult.OOMKilled = true + h.exitResult.Err = fmt.Errorf("Podman container killed by OOM killer") + h.logger.Error("Podman container killed by OOM killer", "container", h.containerID) } - - h.stateLock.Lock() - // keep last known containerStats in handle to - // have it available in the stats emitter - h.containerStats = containerStats - h.stateLock.Unlock() } + + h.procState = drivers.TaskStateExited + h.stateLock.Unlock() + // unblock exitWatcher go routine + close(h.diedChannel) } diff --git a/state.go b/state.go deleted file mode 100644 index bef9d61d..00000000 --- a/state.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "sync" -) - -type taskStore struct { - store map[string]*TaskHandle - lock sync.RWMutex -} - -func newTaskStore() *taskStore { - return &taskStore{store: map[string]*TaskHandle{}} -} - -func (ts *taskStore) Set(id string, handle *TaskHandle) { - ts.lock.Lock() - defer ts.lock.Unlock() - ts.store[id] = handle -} - -func (ts *taskStore) Get(id string) (*TaskHandle, bool) { - ts.lock.RLock() - defer ts.lock.RUnlock() - t, ok := ts.store[id] - return t, ok -} - -func (ts *taskStore) Delete(id string) { - ts.lock.Lock() - defer ts.lock.Unlock() - delete(ts.store, id) -} diff --git a/stateactor.go b/stateactor.go new file mode 100644 index 00000000..fae16bef --- /dev/null +++ b/stateactor.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-driver-podman/api" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type GetTaskHandleMsg struct { + TaskID string + Result chan *TaskHandle +} + +type TaskStartedMsg struct { + TaskID string + TaskHandle *TaskHandle + Done chan bool +} + +type TaskDeletedMsg struct { + TaskID string +} + +type StartStatesEmitterMsg struct { + TaskID string + Interval time.Duration + TaskResourceChannel chan *drivers.TaskResourceUsage +} + +type WaitTaskMsg struct { + TaskID string + Ctx context.Context + ExitResultChannel chan *drivers.ExitResult +} + +// the state actor serializes requests to the TaskHandle by using a message channel. +// it also holds taskid->handle and containerid->handle maps +// and routes incoming podman stats and podman events to TaskHandles +func runStateActor(ctx context.Context, actorMessageChannel <-chan interface{}, logger hclog.Logger) error { + + taskIdToHandle := map[string]*TaskHandle{} + containerIdToHandle := map[string]*TaskHandle{} + + go func() { + logger.Debug("Starting state actor") + + for m := range actorMessageChannel { + switch message := m.(type) { + + case GetTaskHandleMsg: + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + message.Result <- taskHandle + } else { + close(message.Result) + } + + case TaskStartedMsg: + logger.Trace("on started", "message", message) + taskIdToHandle[message.TaskID] = message.TaskHandle + containerIdToHandle[message.TaskHandle.containerID] = message.TaskHandle + // unblock caller + message.Done <- true + + case WaitTaskMsg: + logger.Trace("on wait task", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + go taskHandle.runExitWatcher(message.Ctx, message.ExitResultChannel) + } + + case StartStatesEmitterMsg: + logger.Trace("on start emitter", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + go taskHandle.runStatsEmitter(ctx, message.TaskResourceChannel, message.Interval) + taskHandle.statsEmitterRunning = true + } + + case api.ContainerStats: + taskHandle, ok := containerIdToHandle[message.ContainerID] + // avoid to block the actor if the target stats emitter is not running + if ok && taskHandle.statsEmitterRunning { + taskHandle.containerStatsChannel <- message + } + + case api.ContainerDiedEvent: + logger.Trace("on died", "message", message) + taskHandle, ok := containerIdToHandle[message.ID] + if ok { + go taskHandle.onContainerDied() + } + + case TaskDeletedMsg: + logger.Trace("on deleted", "message", message) + taskHandle, ok := taskIdToHandle[message.TaskID] + if ok { + delete(taskIdToHandle, message.TaskID) + delete(containerIdToHandle, taskHandle.containerID) + } + + } + } + logger.Debug("State actor stopped") + }() + return nil +}