Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: actor based state handling, streaming container stats and podman events #80

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
18 changes: 15 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

type API struct {
baseUrl string
httpClient *http.Client
logger hclog.Logger
baseUrl string
httpClient *http.Client
httpStreamClient *http.Client
logger hclog.Logger
}

type ClientConfig struct {
Expand Down Expand Up @@ -56,6 +57,8 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API {
ac.httpClient = &http.Client{
Timeout: config.HttpTimeout,
}
// we do not want a timeout for streaming requests.
ac.httpStreamClient = &http.Client{}
if strings.HasPrefix(baseUrl, "unix:") {
ac.baseUrl = "http://u"
path := strings.TrimPrefix(baseUrl, "unix:")
Expand All @@ -64,6 +67,7 @@ func NewClient(logger hclog.Logger, config ClientConfig) *API {
return net.Dial("unix", path)
},
}
ac.httpStreamClient.Transport = ac.httpClient.Transport
} else {
ac.baseUrl = baseUrl
}
Expand All @@ -84,6 +88,14 @@ func (c *API) Get(ctx context.Context, path string) (*http.Response, error) {
return c.Do(req)
}

func (c *API) GetStream(ctx context.Context, path string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.baseUrl+path, nil)
if err != nil {
return nil, err
}
return c.httpStreamClient.Do(req)
}

func (c *API) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
return c.PostWithHeaders(ctx, path, body, map[string]string{})
}
Expand Down
7 changes: 0 additions & 7 deletions api/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"
)

// ContainerStart starts a container via id or name
Expand All @@ -23,11 +22,5 @@ func (c *API) ContainerStart(ctx context.Context, name string) error {
return fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body)
}

// wait max 10 seconds for running state
// TODO: make timeout configurable
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

err = c.ContainerWait(timeout, name, []string{"running", "exited"})
return err
}
53 changes: 53 additions & 0 deletions api/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"io/ioutil"
"net/http"

"github.com/mitchellh/go-linereader"
)

var ContainerNotFound = errors.New("No such Container")
Expand Down Expand Up @@ -45,3 +47,54 @@ func (c *API) ContainerStats(ctx context.Context, name string) (Stats, error) {

return stats, nil
}

// ContainerStatsStream streams stats for all containers
func (c *API) ContainerStatsStream(ctx context.Context) (chan ContainerStats, error) {

res, err := c.GetStream(ctx, "/v1.0.0/libpod/containers/stats?stream=true")
if err != nil {
return nil, err
}

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode)
}

statsChannel := make(chan ContainerStats, 5)
lr := linereader.New(res.Body)

go func() {
c.logger.Debug("Running stats stream")
defer func() {
res.Body.Close()
close(statsChannel)
c.logger.Debug("Stopped stats stream")
}()
for {
select {
case <-ctx.Done():
c.logger.Debug("Stopping stats stream")
return
case line, ok := <-lr.Ch:
if !ok {
c.logger.Debug("Stats reader channel was closed")
return
}
var statsReport ContainerStatsReport
if jerr := json.Unmarshal([]byte(line), &statsReport); jerr != nil {
c.logger.Error("Unable to unmarshal statsreport", "err", jerr)
return
}
if statsReport.Error != nil {
c.logger.Error("Stats stream is broken", "error", statsReport.Error)
return
}
for _, stat := range statsReport.Stats {
statsChannel <- stat
}
}
}
}()

return statsChannel, nil
}
136 changes: 136 additions & 0 deletions api/libpod_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"

"github.com/mitchellh/go-linereader"
)

// PodmanEvent is the common header for all events
type PodmanEvent struct {
Type string
Action string
}

// ContainerEvent is a generic PodmanEvent for a single container
// example json:
// {"Type":"container","Action":"create","Actor":{"ID":"cc0d7849692360df2cba94eafb2715b9deec0cbd96ec41c3329dd8636cd070ce","Attributes":{"containerExitCode":"0","image":"docker.io/library/redis:latest","name":"redis-6f2b07a8-73e9-7098-83e1-55939851d46d"}},"scope":"local","time":1609413164,"timeNano":1609413164982188073}
type ContainerEvent struct {
// create/init/start/stop/died
Action string `json:"Action"`
Scope string `json:"scope"`
TimeNano uint64 `json:"timeNano"`
Time uint32 `json:"time"`
Actor ContainerEventActor `json:"Actor"`
}

type ContainerEventActor struct {
ID string `json:"ID"`
Attributes ContainerEventAttributes `json:"Attributes"`
}

type ContainerEventAttributes struct {
Image string `json:"image"`
Name string `json:"name"`
ContainerExitCode string `json:"containerExitCode"`
}

// ContainerStartEvent is emitted when a container completely started
type ContainerStartEvent struct {
ID string
Name string
}

// ContainerDiedEvent is emitted when a container exited
type ContainerDiedEvent struct {
ID string
Name string
ExitCode int
}

// LibpodEventStream streams podman events
func (c *API) LibpodEventStream(ctx context.Context) (chan interface{}, error) {

res, err := c.GetStream(ctx, "/v1.0.0/libpod/events?stream=true")
if err != nil {
return nil, err
}

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unknown error, status code: %d", res.StatusCode)
}

eventsChannel := make(chan interface{}, 5)
lr := linereader.New(res.Body)

go func() {
c.logger.Debug("Running libpod event stream")
defer func() {
res.Body.Close()
close(eventsChannel)
c.logger.Debug("Stopped libpod event stream")
}()
for {
select {
case <-ctx.Done():
c.logger.Debug("Stopping libpod event stream")
return
case line, ok := <-lr.Ch:
if !ok {
c.logger.Debug("Event reader channel was closed")
return
}
var podmanEvent PodmanEvent
err := json.Unmarshal([]byte(line), &podmanEvent)
if err != nil {
c.logger.Error("Unable to parse libpod event", "error", err)
// no need to stop the stream, maybe we can parse the next event
continue
}
c.logger.Trace("libpod event", "event", line)
if podmanEvent.Type == "container" {
var containerEvent ContainerEvent
err := json.Unmarshal([]byte(line), &containerEvent)
if err != nil {
c.logger.Error("Unable to parse ContainerEvent", "error", err)
// no need to stop the stream, maybe we can parse the next event
continue
}
switch containerEvent.Action {
case "start":
eventsChannel <- ContainerStartEvent{
ID: containerEvent.Actor.ID,
Name: containerEvent.Actor.Attributes.Name,
}
continue
case "died":
i, err := strconv.Atoi(containerEvent.Actor.Attributes.ContainerExitCode)
if err != nil {
c.logger.Error("Unable to parse ContainerEvent exitCode", "error", err)
// no need to stop the stream, maybe we can parse the next event
continue
}
eventsChannel <- ContainerDiedEvent{
ID: containerEvent.Actor.ID,
Name: containerEvent.Actor.Attributes.Name,
ExitCode: i,
}
continue
}
// no action specific parser? emit what we've got
eventsChannel <- containerEvent
continue
}

// emit a generic event if we do not have a parser for it
eventsChannel <- podmanEvent
}
}
}()

return eventsChannel, nil
}
36 changes: 36 additions & 0 deletions api/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,3 +1535,39 @@ type Version struct {
Built int64
OsArch string
}

// -------------------------------------------------------------------------------------------------------
// structs copied from https://github.com/containers/podman/blob/master/libpod/define/containerstate.go
//
// some unused parts are modified/commented out to not pull more dependencies
//
// some fields are reordert to make the linter happy (bytes maligned complains)
// -------------------------------------------------------------------------------------------------------

// ContainerStats contains the statistics information for a running container
type ContainerStats struct {
ContainerID string
Name string
PerCPU []uint64
CPU float64
CPUNano uint64
CPUSystemNano uint64
SystemNano uint64
MemUsage uint64
MemLimit uint64
MemPerc float64
NetInput uint64
NetOutput uint64
BlockInput uint64
BlockOutput uint64
PIDs uint64
}

// ContainerStatsReport is used for streaming container stats.
// https://github.com/containers/podman/blob/master/pkg/domain/entities/containers.go
type ContainerStatsReport struct {
// Error from reading stats.
Error error
// Results, set when there is no error.
Stats []ContainerStats
}
Loading