forked from kata-containers/kata-containers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
153 lines (127 loc) · 3.02 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
)
const (
defaultCheckInterval = 5 * time.Second
watcherChannelSize = 128
)
var monitorLog = virtLog.WithField("subsystem", "virtcontainers/monitor")
// nolint: govet
type monitor struct {
sandbox *Sandbox
stopCh chan bool
watchers []chan error
wg sync.WaitGroup
checkInterval time.Duration
sync.Mutex
running bool
}
func newMonitor(s *Sandbox) *monitor {
// there should only be one monitor for one sandbox,
// so it's safe to let monitorLog as a global variable.
monitorLog = monitorLog.WithField("sandbox", s.ID())
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher(ctx context.Context) (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, watcherChannelSize)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchHypervisor(ctx)
m.watchAgent(ctx)
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(ctx context.Context, err error) {
monitorLog.WithError(err).Warn("notify on errors")
m.sandbox.agent.markDead(ctx)
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
monitorLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
monitorLog.WithError(err).Warn("write error to watcher")
// throw away message can not write to channel
// make it not stuck, the first error is useful.
select {
case c <- err:
default:
monitorLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
}
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
monitorLog.Info("stopping monitor")
m.Lock()
defer m.Unlock()
if !m.running {
return
}
m.stopCh <- true
defer func() {
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
monitorLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent(ctx context.Context) {
err := m.sandbox.agent.check(ctx)
if err != nil {
// TODO: define and export error types
m.notify(ctx, errors.Wrapf(err, "failed to ping agent"))
}
}
func (m *monitor) watchHypervisor(ctx context.Context) error {
if err := m.sandbox.hypervisor.Check(); err != nil {
m.notify(ctx, errors.Wrapf(err, "failed to ping hypervisor process"))
return err
}
return nil
}