Skip to content

Commit

Permalink
libimage: add an events system
Browse files Browse the repository at this point in the history
Add an event system to libimage.  Callers can opt-in to using events by
requesting an event channel via `(*Runtime).EventChannel()`.  The
returned channel has a buffer of size 100 which should be sufficient
even under high loads.  But, to be on the safe side, writing an event
will time out after 2 seconds to prevent operations from blocking.

Currently, the only user of such an event system is Podman which will
need to convert the `Event` type to what's used internally in libpod.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed May 5, 2021
1 parent 42134aa commit a8950e5
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 1 deletion.
21 changes: 20 additions & 1 deletion libimage/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package libimage

import "time"
import (
"time"

"github.com/sirupsen/logrus"
)

// EventType indicates the type of an event. Currrently, there is only one
// supported type for container image but we may add more (e.g., for manifest
Expand Down Expand Up @@ -41,3 +45,18 @@ type Event struct {
// Type of the event.
Type EventType
}

// writeEvent writes the specified event to the Runtime's event channel. The
// event is discarded if no event channel has been registered (yet).
func (r *Runtime) writeEvent(event *Event) {
select {
case r.eventChannel <- event:
// Done
case <-time.After(2 * time.Second):
// The Runtime's event channel has a buffer of size 100 which
// should be enough even under high load. However, we
// shouldn't block too long in case the buffer runs full (could
// be an honest user error or bug).
logrus.Warnf("Discarding libimage event which was not read within 2 seconds: %v", event)
}
}
17 changes: 17 additions & 0 deletions libimage/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ func (i *Image) remove(ctx context.Context, rmMap map[string]*RemoveImageReport,
return errors.Errorf("cannot remove read-only image %q", i.ID())
}

if i.runtime.eventChannel != nil {
i.runtime.writeEvent(&Event{ID: i.ID(), Name: referencedBy, Time: time.Now(), Type: EventTypeImageRemove})
}

// Check if already visisted this image.
report, exists := rmMap[i.ID()]
if exists {
Expand Down Expand Up @@ -423,6 +427,9 @@ func (i *Image) Tag(name string) error {
}

logrus.Debugf("Tagging image %s with %q", i.ID(), ref.String())
if i.runtime.eventChannel != nil {
i.runtime.writeEvent(&Event{ID: i.ID(), Name: name, Time: time.Now(), Type: EventTypeImageTag})
}

newNames := append(i.Names(), ref.String())
if err := i.runtime.store.SetNames(i.ID(), newNames); err != nil {
Expand Down Expand Up @@ -454,6 +461,9 @@ func (i *Image) Untag(name string) error {
name = ref.String()

logrus.Debugf("Untagging %q from image %s", ref.String(), i.ID())
if i.runtime.eventChannel != nil {
i.runtime.writeEvent(&Event{ID: i.ID(), Name: name, Time: time.Now(), Type: EventTypeImageUntag})
}

removedName := false
newNames := []string{}
Expand Down Expand Up @@ -593,6 +603,10 @@ func (i *Image) RepoDigests() ([]string, error) {
// are directly passed down to the containers storage. Returns the fully
// evaluated path to the mount point.
func (i *Image) Mount(ctx context.Context, mountOptions []string, mountLabel string) (string, error) {
if i.runtime.eventChannel != nil {
i.runtime.writeEvent(&Event{ID: i.ID(), Name: "", Time: time.Now(), Type: EventTypeImageMount})
}

mountPoint, err := i.runtime.store.MountImage(i.ID(), mountOptions, mountLabel)
if err != nil {
return "", err
Expand Down Expand Up @@ -634,6 +648,9 @@ func (i *Image) Mountpoint() (string, error) {
// Unmount the image. Use force to ignore the reference counter and forcefully
// unmount.
func (i *Image) Unmount(force bool) error {
if i.runtime.eventChannel != nil {
i.runtime.writeEvent(&Event{ID: i.ID(), Name: "", Time: time.Now(), Type: EventTypeImageUnmount})
}
logrus.Debugf("Unmounted image %s", i.ID())
_, err := i.runtime.store.UnmountImage(i.ID(), force)
return err
Expand Down
3 changes: 3 additions & 0 deletions libimage/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"time"

dirTransport "github.com/containers/image/v5/directory"
dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
Expand All @@ -23,6 +24,8 @@ type LoadOptions struct {
func (r *Runtime) Load(ctx context.Context, path string, options *LoadOptions) ([]string, error) {
logrus.Debugf("Loading image from %q", path)

r.writeEvent(&Event{ID: "", Name: path, Time: time.Now(), Type: EventTypeImageLoad})

var (
loadedImages []string
loadError error
Expand Down
5 changes: 5 additions & 0 deletions libimage/manifest_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libimage
import (
"context"
"fmt"
"time"

"github.com/containers/common/libimage/manifests"
imageCopy "github.com/containers/image/v5/copy"
Expand Down Expand Up @@ -364,6 +365,10 @@ func (m *ManifestList) Push(ctx context.Context, destination string, options *Ma
}
}

if m.image.runtime.eventChannel != nil {
m.image.runtime.writeEvent(&Event{ID: m.ID(), Name: destination, Time: time.Now(), Type: EventTypeImagePush})
}

// NOTE: we're using the logic in copier to create a proper
// types.SystemContext. This prevents us from having an error prone
// code duplicate here.
Expand Down
5 changes: 5 additions & 0 deletions libimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"strings"
"time"

"github.com/containers/common/pkg/config"
dirTransport "github.com/containers/image/v5/directory"
Expand Down Expand Up @@ -80,6 +81,10 @@ func (r *Runtime) Pull(ctx context.Context, name string, pullPolicy config.PullP
return nil, errors.Errorf("pulling all tags is not supported for %s transport", ref.Transport().Name())
}

if r.eventChannel != nil {
r.writeEvent(&Event{ID: "", Name: name, Time: time.Now(), Type: EventTypeImagePull})
}

var (
pulledImages []string
pullError error
Expand Down
5 changes: 5 additions & 0 deletions libimage/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package libimage

import (
"context"
"time"

dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
"github.com/containers/image/v5/docker/reference"
Expand Down Expand Up @@ -61,6 +62,10 @@ func (r *Runtime) Push(ctx context.Context, source, destination string, options
destRef = dockerRef
}

if r.eventChannel != nil {
r.writeEvent(&Event{ID: image.ID(), Name: destination, Time: time.Now(), Type: EventTypeImagePush})
}

// Buildah compat: Make sure to tag the destination image if it's a
// Docker archive. This way, we preseve the image name.
if destRef.Transport().Name() == dockerArchiveTransport.Transport.Name() {
Expand Down
17 changes: 17 additions & 0 deletions libimage/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (

// RuntimeOptions allow for creating a customized Runtime.
type RuntimeOptions struct {
// The base system context of the runtime which will be used throughout
// the entire lifespan of the Runtime. Certain options in some
// functions may override specific fields.
SystemContext *types.SystemContext
}

Expand All @@ -41,6 +44,8 @@ func setRegistriesConfPath(systemContext *types.SystemContext) {
// Runtime is responsible for image management and storing them in a containers
// storage.
type Runtime struct {
// Use to send events out to users.
eventChannel chan *Event
// Underlying storage store.
store storage.Store
// Global system context. No pointer to simplify copying and modifying
Expand All @@ -55,6 +60,18 @@ func (r *Runtime) systemContextCopy() *types.SystemContext {
return &sys
}

// EventChannel creates a buffered channel for events that the Runtime will use
// to write events to. Callers are expected to read from the channel in a
// timely manner.
// Can be called once for a given Runtime.
func (r *Runtime) EventChannel() chan *Event {
if r.eventChannel != nil {
return r.eventChannel
}
r.eventChannel = make(chan *Event, 100)
return r.eventChannel
}

// RuntimeFromStore returns a Runtime for the specified store.
func RuntimeFromStore(store storage.Store, options *RuntimeOptions) (*Runtime, error) {
if options == nil {
Expand Down
8 changes: 8 additions & 0 deletions libimage/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libimage
import (
"context"
"strings"
"time"

dirTransport "github.com/containers/image/v5/directory"
dockerArchiveTransport "github.com/containers/image/v5/docker/archive"
Expand Down Expand Up @@ -74,6 +75,10 @@ func (r *Runtime) saveSingleImage(ctx context.Context, name, format, path string
return err
}

if r.eventChannel != nil {
r.writeEvent(&Event{ID: image.ID(), Name: path, Time: time.Now(), Type: EventTypeImageSave})
}

// Unless the image was referenced by ID, use the resolved name as a
// tag.
var tag string
Expand Down Expand Up @@ -160,6 +165,9 @@ func (r *Runtime) saveDockerArchive(ctx context.Context, names []string, path st
}
}
localImages[image.ID()] = local
if r.eventChannel != nil {
r.writeEvent(&Event{ID: image.ID(), Name: path, Time: time.Now(), Type: EventTypeImageSave})
}
}

writer, err := dockerArchiveTransport.NewWriter(r.systemContextCopy(), path)
Expand Down

0 comments on commit a8950e5

Please sign in to comment.