Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: events service logic code of all links #2071

Merged
merged 2 commits into from
Aug 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func initRoute(s *Server) http.Handler {
s.addRoute(r, http.MethodGet, "/info", s.info)
s.addRoute(r, http.MethodGet, "/version", s.version)
s.addRoute(r, http.MethodPost, "/auth", s.auth)
s.addRoute(r, http.MethodGet, "/events", s.events)

// daemon, we still list this API into system manager.
s.addRoute(r, http.MethodPost, "/daemon/update", s.updateDaemon)
Expand Down
91 changes: 91 additions & 0 deletions apis/server/system_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ package server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/alibaba/pouch/apis/filters"
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/pkg/httputils"
"github.com/alibaba/pouch/pkg/utils"

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
)

func (s *Server) ping(context context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
Expand Down Expand Up @@ -60,3 +67,87 @@ func (s *Server) auth(ctx context.Context, rw http.ResponseWriter, req *http.Req
}
return EncodeResponse(rw, http.StatusOK, authResp)
}

func (s *Server) events(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

rw.Header().Set("Content-Type", "application/json")
output := ioutils.NewWriteFlusher(rw)
defer output.Close()
output.Flush()
enc := json.NewEncoder(output)

// parse the since and until parameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we extract the logic of parse and validate parameter into one function?

since, err := eventTime(req.FormValue("since"))
if err != nil {
return err
}
until, err := eventTime(req.FormValue("until"))
if err != nil {
return err
}

var (
timeout <-chan time.Time
onlyPastEvents bool
)
if !until.IsZero() {
if until.Before(since) {
return fmt.Errorf("until time (%s) cannot be after since (%s)", req.FormValue("until"), req.FormValue("since"))
}

now := time.Now()
onlyPastEvents = until.Before(now)
if !onlyPastEvents {
dur := until.Sub(now)
timeout = time.NewTimer(dur).C
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to stop the timer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it no need to stop it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? The timer should be closed if the goroutine exits. If not, it maybe impact other timers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only call NewTimer when just need to, and when created the timer, we will wait until the timer close.

Do i missing something?

}
}

ef, err := filters.FromParam(req.FormValue("filters"))
if err != nil {
return err
}

// send past events
buffered, eventq, errq := s.SystemMgr.SubscribeToEvents(ctx, since, until, ef)
for _, ev := range buffered {
if err := enc.Encode(ev); err != nil {
return err
}
}

// if until time is before now(), we only send past events
if onlyPastEvents {
return nil
}

// start subscribe new pouchd events
for {
select {
case ev := <-eventq:
if err := enc.Encode(ev); err != nil {
return err
}
case err := <-errq:
if err != nil {
return errors.Wrapf(err, "subscribe failed")
}
return nil
case <-timeout:
return nil
}
}
}

func eventTime(formTime string) (time.Time, error) {
t, tNano, err := utils.ParseTimestamp(formTime, -1)
if err != nil {
return time.Time{}, err
}
if t == -1 {
return time.Time{}, nil
}
return time.Unix(t, tNano), nil
}
46 changes: 46 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,52 @@ paths:
schema:
$ref: "#/definitions/DaemonUpdateConfig"

/events:
get:
summary: "Subscribe pouchd events to users"
description: |
Stream real-time events from the server.
Report various object events of pouchd when something happens to them.
Containers report these events: create`, `destroy`, `die`, `oom`, `pause`, `rename`, `resize`, `restart`, `start`, `stop`, `top`, `unpause`, and `update`
Images report these events: `pull`, `untag`
Volumes report these events: `create`, `destroy`
Networks report these events: `create`, `connect`, `disconnect`, `destroy`
produces:
- "application/json"
responses:
200:
description: "no error"
schema:
$ref: '#definitions/EventsMessage'
400:
description: "bad parameter"
schema:
$ref: '#/definitions/Error'
500:
$ref: "#/responses/500ErrorResponse"
parameters:
- name: "since"
in: "query"
description: "Show events created since this timestamp then stream new events."
type: "string"
- name: "until"
in: "query"
description: "Show events created until this timestamp then stop streaming"
type: "string"
- name: "filters"
in: "query"
description: |
A JSON encoded value of filters (a `map[string][]string`) to process on the event list. Available filters:
- `container=<string>` container name or ID
- `event=<string>` event type
- `image=<string>` image name or ID
- `label=<string>` image or container label
- `network=<string>` network name or ID
- `type=<string>` object to filter by, one of `container`, `image`, `volume`, `network`
- `volume=<string>` volume name
type: "string"


/images/create:
post:
summary: "Create an image by pulling from a registry or importing from an existing source file"
Expand Down
8 changes: 4 additions & 4 deletions apis/types/registry_service_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 143 additions & 0 deletions cli/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"strings"
"time"

"github.com/alibaba/pouch/apis/filters"
"github.com/alibaba/pouch/apis/types"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"github.com/alibaba/pouch/pkg/utils"

"github.com/spf13/cobra"
)

// eventsDescription is used to describe events command in detail and auto generate command doc.
var eventsDescription = "events cli tool is used to subscribe pouchd events." +
"We support filter parameter to filter some events that we care about or not."

// EventsCommand use to implement 'events' command.
type EventsCommand struct {
baseCommand
since string
until string
filter []string
}

// Init initialize events command.
func (e *EventsCommand) Init(c *Cli) {
e.cli = c
e.cmd = &cobra.Command{
Use: "events [OPTIONS]",
Short: "Get real time events from the daemon",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return e.runEvents()
},
Example: eventsExample(),
}
e.addFlags()
}

// addFlags adds flags for specific command.
func (e *EventsCommand) addFlags() {
flagSet := e.cmd.Flags()

flagSet.StringVarP(&e.since, "since", "s", "", "Show all events created since timestamp")
flagSet.StringVarP(&e.until, "until", "u", "", "Stream events until this timestamp")
flagSet.StringSliceVarP(&e.filter, "filter", "f", []string{}, "Filter output based on conditions provided")
}

// runEvents is the entry of events command.
func (e *EventsCommand) runEvents() error {
ctx := context.Background()
apiClient := e.cli.Client()

eventFilterArgs := filters.NewArgs()

// TODO: parse params
for _, f := range e.filter {
var err error
eventFilterArgs, err = filters.ParseFlag(f, eventFilterArgs)
if err != nil {
return err
}
}

responseBody, err := apiClient.Events(ctx, e.since, e.until, eventFilterArgs)
if err != nil {
return err
}

return streamEvents(responseBody, os.Stdout)
}

// streamEvents decodes prints the incoming events in the provided output.
func streamEvents(input io.Reader, output io.Writer) error {
return DecodeEvents(input, func(event types.EventsMessage, err error) error {
if err != nil {
return err
}
printOutput(event, output)
return nil
})
}

type eventProcessor func(event types.EventsMessage, err error) error

// printOutput prints all types of event information.
// Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any.
func printOutput(event types.EventsMessage, output io.Writer) {
if event.TimeNano != 0 {
fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(utils.RFC3339NanoFixed))
} else if event.Time != 0 {
fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(utils.RFC3339NanoFixed))
}

fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID)

if len(event.Actor.Attributes) > 0 {
var attrs []string
var keys []string
for k := range event.Actor.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := event.Actor.Attributes[k]
attrs = append(attrs, fmt.Sprintf("%s=%s", k, v))
}
fmt.Fprintf(output, " (%s)", strings.Join(attrs, ", "))
}
fmt.Fprint(output, "\n")
}

// DecodeEvents decodes event from input stream
func DecodeEvents(input io.Reader, ep eventProcessor) error {
dec := json.NewDecoder(input)
for {
var event types.EventsMessage
err := dec.Decode(&event)
if err != nil && err == io.EOF {
break
}

if procErr := ep(event, err); procErr != nil {
return procErr
}
}
return nil
}

func eventsExample() string {
return `$ pouch events -s "2018-08-10T10:52:05"
2018-08-10T10:53:15.071664386-04:00 volume create 9fff54f207615ccc5a29477f5ae2234c6b804ed8aad2f0dfc0dccb0cc69d4d12 (driver=local)
2018-08-10T10:53:15.091131306-04:00 container create f2b58eb6bc616d7a22bdb89de50b3f04e2c23134accdec1a9b9a7490d609d34c (image=registry.hub.docker.com/library/centos:latest, name=test)
2018-08-10T10:53:15.537704818-04:00 container start f2b58eb6bc616d7a22bdb89de50b3f04e2c23134accdec1a9b9a7490d609d34c (image=registry.hub.docker.com/library/centos:latest, name=test)`
}
1 change: 1 addition & 0 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func main() {
cli.AddCommand(base, &WaitCommand{})
cli.AddCommand(base, &DaemonUpdateCommand{})
cli.AddCommand(base, &CheckpointCommand{})
cli.AddCommand(base, &EventsCommand{})

// add generate doc command
cli.AddCommand(base, &GenDocCommand{})
Expand Down
2 changes: 2 additions & 0 deletions client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"

"github.com/alibaba/pouch/apis/filters"
"github.com/alibaba/pouch/apis/types"
)

Expand Down Expand Up @@ -71,6 +72,7 @@ type SystemAPIClient interface {
SystemInfo(ctx context.Context) (*types.SystemInfo, error)
RegistryLogin(ctx context.Context, auth *types.AuthConfig) (*types.AuthResponse, error)
DaemonUpdate(ctx context.Context, daemonConfig *types.DaemonUpdateConfig) error
Events(ctx context.Context, since string, until string, filters filters.Args) (io.ReadCloser, error)
}

// NetworkAPIClient defines methods of Network client.
Expand Down
Loading