forked from kube-HPC/k8s-dummy-device-plugin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dummy.go
257 lines (221 loc) · 6.82 KB
/
dummy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/golang/glog"
"io/ioutil"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
// DummyDeviceManager manages our dummy devices
type DummyDeviceManager struct {
devices map[string]*pluginapi.Device
socket string
server *grpc.Server
health chan *pluginapi.Device
}
// Init function for our dummy devices
func (ddm *DummyDeviceManager) Init() error {
glog.Info("Initializing dummy device plugin...")
return nil
}
// discoverDummyResources populates device list
// TODO: We currently only do this once at init, need to change it to do monitoring
// and health state update
func (ddm *DummyDeviceManager) discoverDummyResources() error {
glog.Info("Discovering dummy devices")
raw, err := ioutil.ReadFile("./dummyResources.json")
if err != nil {
fmt.Println(err.Error())
return err
}
var devs []map[string]string
err = json.Unmarshal(raw, &devs)
if err != nil {
fmt.Println(err)
return err
}
ddm.devices = make(map[string]*pluginapi.Device)
for _, dev := range devs {
newdev := pluginapi.Device{ID: dev["name"], Health: pluginapi.Healthy}
ddm.devices[dev["name"]] = &newdev
}
glog.Infof("Devices found: %v", ddm.devices)
return nil
}
// Start starts the gRPC server of the device plugin
func (ddm *DummyDeviceManager) Start() error {
err := ddm.cleanup()
if err != nil {
return err
}
sock, err := net.Listen("unix", ddm.socket)
if err != nil {
return err
}
ddm.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(ddm.server, ddm)
go ddm.server.Serve(sock)
// Wait for server to start by launching a blocking connection
conn, err := grpc.Dial(ddm.socket, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return err
}
conn.Close()
go ddm.healthcheck()
return nil
}
// Stop stops the gRPC server
func (ddm *DummyDeviceManager) Stop() error {
if ddm.server == nil {
return nil
}
ddm.server.Stop()
ddm.server = nil
return ddm.cleanup()
}
// healthcheck monitors and updates device status
// TODO: Currently does nothing, need to implement
func (ddm *DummyDeviceManager) healthcheck() error {
for {
glog.Info(ddm.devices)
time.Sleep(60 * time.Second)
}
}
func (ddm *DummyDeviceManager) cleanup() error {
if err := os.Remove(ddm.socket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// Register with kubelet
func Register() error {
conn, err := grpc.Dial(pluginapi.KubeletSocket, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
defer conn.Close()
if err != nil {
return fmt.Errorf("device-plugin: cannot connect to kubelet service: %v", err)
}
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
Endpoint: "dummy.sock",
// Schedulable resource name.
ResourceName: "nvidia.com/gpu",
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return fmt.Errorf("device-plugin: cannot register to kubelet service: %v", err)
}
return nil
}
// ListAndWatch lists devices and update that list according to the health status
func (ddm *DummyDeviceManager) ListAndWatch(emtpy *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
glog.Info("device-plugin: ListAndWatch start\n")
resp := new(pluginapi.ListAndWatchResponse)
for _, dev := range ddm.devices {
glog.Info("dev ", dev)
resp.Devices = append(resp.Devices, dev)
}
glog.Info("resp.Devices ", resp.Devices)
if err := stream.Send(resp); err != nil {
glog.Errorf("Failed to send response to kubelet: %v", err)
}
for {
select {
case d := <-ddm.health:
d.Health = pluginapi.Unhealthy
resp := new(pluginapi.ListAndWatchResponse)
for _, dev := range ddm.devices {
glog.Info("dev ", dev)
resp.Devices = append(resp.Devices, dev)
}
glog.Info("resp.Devices ", resp.Devices)
if err := stream.Send(resp); err != nil {
glog.Errorf("Failed to send response to kubelet: %v", err)
}
}
}
}
// Allocate devices
func (ddm *DummyDeviceManager) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
glog.Info("Allocate")
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
if _, ok := ddm.devices[id]; !ok {
glog.Errorf("Can't allocate interface %s", id)
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}
glog.Info("Allocated interfaces ", req.DevicesIDs)
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{"DUMMY_DEVICES": strings.Join(req.DevicesIDs, ",")},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
// GetDevicePluginOptions returns options to be communicated with Device Manager
func (ddm *DummyDeviceManager) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
func (ddm *DummyDeviceManager) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
func main() {
flag.Parse()
flag.Lookup("logtostderr").Value.Set("true")
// Create new dummy device manager
ddm := &DummyDeviceManager{
devices: make(map[string]*pluginapi.Device),
socket: pluginapi.DevicePluginPath + "dummy.sock",
health: make(chan *pluginapi.Device),
}
// Populate device list
err := ddm.discoverDummyResources()
if err != nil {
glog.Fatal(err)
}
// Respond to syscalls for termination
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// Start grpc server
err = ddm.Start()
if err != nil {
glog.Fatalf("Could not start device plugin: %v", err)
}
glog.Infof("Starting to serve on %s", ddm.socket)
// Registers with Kubelet.
err = Register()
if err != nil {
glog.Fatal(err)
}
fmt.Printf("device-plugin registered\n")
select {
case s := <-sigs:
glog.Infof("Received signal \"%v\", shutting down.", s)
ddm.Stop()
return
}
}