Skip to content

Commit

Permalink
cgroup2: implement containerd.events.TaskOOM event
Browse files Browse the repository at this point in the history
How to test (from opencontainers/runc#2352 (comment)):
  (host)$ sudo swapoff -a
  (host)$ sudo ctr run -t --rm --memory-limit $((1024*1024*32)) docker.io/library/alpine:latest foo
  (container)$ sh -c 'VAR=$(seq 1 100000000)'

An event `/tasks/oom {"container_id":"foo"}` will be displayed in `ctr events`.

Signed-off-by: Akihiro Suda <[email protected]>
  • Loading branch information
AkihiroSuda authored and fahedouch committed Aug 7, 2020
1 parent 54ea68c commit e328222
Show file tree
Hide file tree
Showing 17 changed files with 861 additions and 149 deletions.
15 changes: 0 additions & 15 deletions metrics/cgroups/v2/cgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ package v2
import (
"context"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v1/linux"
metrics "github.com/docker/go-metrics"
)

Expand All @@ -48,19 +46,6 @@ func (m *cgroupsMonitor) Monitor(c runtime.Task) error {
if err := m.collector.Add(c); err != nil {
return err
}
t, ok := c.(*linux.Task)
if !ok {
return nil
}
cg, err := t.Cgroup()
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return err
}
// OOM handler is not implemented yet
_ = cg
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions metrics/cgroups/v2/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,20 @@ var memoryMetrics = []*metric{
}
},
},
{
name: "memory_oom",
help: "The number of times a container has received an oom event",
unit: metrics.Total,
vt: prometheus.GaugeValue,
getValues: func(stats *v2.Metrics) []value {
if stats.MemoryEvents == nil {
return nil
}
return []value{
{
v: float64(stats.MemoryEvents.Oom),
},
}
},
},
}
30 changes: 30 additions & 0 deletions pkg/oom/oom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// +build linux

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oom

import (
"context"
)

// Watcher watches OOM events
type Watcher interface {
Close() error
Run(ctx context.Context)
Add(id string, cg interface{}) error
}
26 changes: 16 additions & 10 deletions pkg/oom/epoll.go → pkg/oom/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,38 @@
limitations under the License.
*/

package oom
package v1

import (
"context"
"sync"

"github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/pkg/oom"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

// New returns an epoll implementation that listens to OOM events
// from a container's cgroups.
func New(publisher shim.Publisher) (*Epoller, error) {
func New(publisher shim.Publisher) (oom.Watcher, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}
return &Epoller{
return &epoller{
fd: fd,
publisher: publisher,
set: make(map[uintptr]*item),
}, nil
}

// Epoller implementation for handling OOM events from a container's cgroup
type Epoller struct {
// epoller implementation for handling OOM events from a container's cgroup
type epoller struct {
mu sync.Mutex

fd int
Expand All @@ -59,12 +61,12 @@ type item struct {
}

// Close the epoll fd
func (e *Epoller) Close() error {
func (e *epoller) Close() error {
return unix.Close(e.fd)
}

// Run the epoll loop
func (e *Epoller) Run(ctx context.Context) {
func (e *epoller) Run(ctx context.Context) {
var events [128]unix.EpollEvent
for {
select {
Expand All @@ -86,8 +88,12 @@ func (e *Epoller) Run(ctx context.Context) {
}
}

// Add the cgroup to the epoll monitor
func (e *Epoller) Add(id string, cg cgroups.Cgroup) error {
// Add cgroups.Cgroup to the epoll monitor
func (e *epoller) Add(id string, cgx interface{}) error {
cg, ok := cgx.(cgroups.Cgroup)
if !ok {
return errors.Errorf("expected cgroups.Cgroup, got: %T", cgx)
}
e.mu.Lock()
defer e.mu.Unlock()
fd, err := cg.OOMEventFD()
Expand All @@ -105,7 +111,7 @@ func (e *Epoller) Add(id string, cg cgroups.Cgroup) error {
return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
}

func (e *Epoller) process(ctx context.Context, fd uintptr) {
func (e *epoller) process(ctx context.Context, fd uintptr) {
flush(fd)
e.mu.Lock()
i, ok := e.set[fd]
Expand Down
113 changes: 113 additions & 0 deletions pkg/oom/v2/v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// +build linux

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v2

import (
"context"

cgroupsv2 "github.com/containerd/cgroups/v2"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/pkg/oom"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// New returns an implementation that listens to OOM events
// from a container's cgroups.
func New(publisher shim.Publisher) (oom.Watcher, error) {
return &watcher{
itemCh: make(chan item),
publisher: publisher,
}, nil
}

// watcher implementation for handling OOM events from a container's cgroup
type watcher struct {
itemCh chan item
publisher shim.Publisher
}

type item struct {
id string
ev cgroupsv2.Event
err error
}

// Close closes the watcher
func (w *watcher) Close() error {
return nil
}

// Run the loop
func (w *watcher) Run(ctx context.Context) {
lastOOMMap := make(map[string]uint64) // key: id, value: ev.OOM
for {
select {
case <-ctx.Done():
w.Close()
return
case i := <-w.itemCh:
if i.err != nil {
delete(lastOOMMap, i.id)
continue
}
lastOOM := lastOOMMap[i.id]
if i.ev.OOM > lastOOM {
if err := w.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
ContainerID: i.id,
}); err != nil {
logrus.WithError(err).Error("publish OOM event")
}
}
if i.ev.OOM > 0 {
lastOOMMap[i.id] = i.ev.OOM
}
}
}
}

// Add cgroups.Cgroup to the epoll monitor
func (w *watcher) Add(id string, cgx interface{}) error {
cg, ok := cgx.(*cgroupsv2.Manager)
if !ok {
return errors.Errorf("expected *cgroupsv2.Manager, got: %T", cgx)
}
// FIXME: cgroupsv2.Manager does not support closing eventCh routine currently.
// The routine shuts down when an error happens, mostly when the cgroup is deleted.
eventCh, errCh := cg.EventChan()
go func() {
for {
i := item{id: id}
select {
case ev := <-eventCh:
i.ev = ev
w.itemCh <- i
case err := <-errCh:
i.err = err
w.itemCh <- i
// we no longer get any event/err when we got an err
logrus.WithError(err).Warn("error from *cgroupsv2.Manager.EventChan")
return
}
}
}()
return nil
}
5 changes: 3 additions & 2 deletions runtime/v2/runc/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc"
Expand All @@ -58,7 +59,7 @@ var (

// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := oom.New(publisher)
ep, err := oomv1.New(publisher)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,7 +91,7 @@ type service struct {
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep *oom.Epoller
ep oom.Watcher

id string
container *runc.Container
Expand Down
20 changes: 15 additions & 5 deletions runtime/v2/runc/v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/pkg/oom/v2"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc"
Expand Down Expand Up @@ -73,7 +75,15 @@ type spec struct {

// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := oom.New(publisher)
var (
ep oom.Watcher
err error
)
if cgroups.Mode() == cgroups.Unified {
ep, err = oomv2.New(publisher)
} else {
ep, err = oomv1.New(publisher)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,7 +116,7 @@ type service struct {
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep *oom.Epoller
ep oom.Watcher

// id only used in cleanup case
id string
Expand Down Expand Up @@ -344,9 +354,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
}
}

// OOM monitor is not implemented yet
logrus.WithError(errdefs.ErrNotImplemented).Warn("add cg to OOM monitor")
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
}

s.send(&eventstypes.TaskStart{
Expand Down
2 changes: 1 addition & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/beorn7/perks v1.0.1
github.com/BurntSushi/toml v0.3.1
github.com/cespare/xxhash/v2 v2.1.1
github.com/containerd/btrfs 153935315f4ab9be5bf03650a1341454b05efa5d
github.com/containerd/cgroups b4448137398923af7f4918b8b2ad8249172ca7a6
github.com/containerd/cgroups 0dbf7f05ba59274095946e2c0c89540726e8a8aa
github.com/containerd/console v1.0.0
github.com/containerd/continuity d3ef23f19fbb106bb73ffde425d07a9187e30745
github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/containerd/cgroups/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e328222

Please sign in to comment.