-
Notifications
You must be signed in to change notification settings - Fork 2k
/
csi_hook.go
429 lines (362 loc) · 11.7 KB
/
csi_hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
package allocrunner
import (
"context"
"fmt"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// csiHook will wait for remote csi volumes to be attached to the host before
// continuing.
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
// interfaces implemented by the allocRunner
rpcClient RPCer
taskCapabilityGetter taskCapabilityGetter
updater hookResourceSetter
nodeSecret string
volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
}
// implemented by allocrunner
type taskCapabilityGetter interface {
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
}
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())
return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
csimanager: csi,
rpcClient: rpcClient,
taskCapabilityGetter: taskCapabilityGetter,
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
}
func (c *csiHook) Name() string {
return "csi_hook"
}
func (c *csiHook) Prerun() error {
if !c.shouldRun() {
return nil
}
volumes, err := c.claimVolumesFromAlloc()
if err != nil {
return fmt.Errorf("claim volumes: %v", err)
}
c.volumeRequests = volumes
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
// We use this context only to attach hclog to the gRPC
// context. The lifetime is the lifetime of the gRPC stream,
// not specific RPC timeouts, but we manage the stream
// lifetime via Close in the pluginmanager.
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: pair.request.AttachmentMode,
AccessMode: pair.request.AccessMode,
MountOptions: pair.request.MountOptions,
}
mountInfo, err := mounter.MountVolume(
c.shutdownCtx, pair.volume, c.alloc, usageOpts, pair.publishContext)
if err != nil {
return err
}
mounts[alias] = mountInfo
}
res := c.updater.GetAllocHookResources()
res.CSIMounts = mounts
c.updater.SetAllocHookResources(res)
return nil
}
// Postrun sends an RPC to the server to unpublish the volume. This may
// forward client RPCs to the node plugins or to the controller plugins,
// depending on whether other allocations on this node have claims on this
// volume.
func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}
var wg sync.WaitGroup
errs := make(chan error, len(c.volumeRequests))
for _, pair := range c.volumeRequests {
wg.Add(1)
// CSI RPCs can potentially take a long time. Split the work
// into goroutines so that operators could potentially reuse
// one of a set of volumes
go func(pair *volumeAndRequest) {
defer wg.Done()
err := c.unmountImpl(pair)
if err != nil {
// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up. Don't block shutdown so that
// we don't block shutting down the client in -dev mode
go func(pair *volumeAndRequest) {
err := c.unmountWithRetry(pair)
if err != nil {
c.logger.Error("volume could not be unmounted")
}
err = c.unpublish(pair)
if err != nil {
c.logger.Error("volume could not be unpublished")
}
}(pair)
}
// we can't recover from this RPC error client-side; the
// volume claim GC job will have to clean up for us once
// the allocation is marked terminal
errs <- c.unpublish(pair)
}(pair)
}
wg.Wait()
close(errs) // so we don't block waiting if there were no errors
var mErr *multierror.Error
for err := range errs {
mErr = multierror.Append(mErr, err)
}
return mErr.ErrorOrNil()
}
type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest
// When volumeAndRequest was returned from a volume claim, this field will be
// populated for plugins that require it.
publishContext map[string]string
}
// claimVolumesFromAlloc is used by the pre-run hook to fetch all of the volume
// metadata and claim it for use by this alloc/node at the same time.
func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) {
result := make(map[string]*volumeAndRequest)
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
// Initially, populate the result map with all of the requests
for alias, volumeRequest := range tg.Volumes {
if volumeRequest.Type == structs.VolumeTypeCSI {
for _, task := range tg.Tasks {
caps, err := c.taskCapabilityGetter.GetTaskDriverCapabilities(task.Name)
if err != nil {
return nil, fmt.Errorf("could not validate task driver capabilities: %v", err)
}
if caps.MountConfigs == drivers.MountConfigSupportNone {
return nil, fmt.Errorf(
"task driver %q for %q does not support CSI", task.Driver, task.Name)
}
}
result[alias] = &volumeAndRequest{request: volumeRequest}
}
}
// Iterate over the result map and upsert the volume field as each volume gets
// claimed by the server.
for alias, pair := range result {
claimType := structs.CSIVolumeClaimWrite
if pair.request.ReadOnly {
claimType = structs.CSIVolumeClaimRead
}
source := pair.request.Source
if pair.request.PerAlloc {
source = source + structs.AllocSuffix(c.alloc.Name)
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
AccessMode: pair.request.AccessMode,
AttachmentMode: pair.request.AttachmentMode,
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
resp, err := c.claimWithRetry(req)
if err != nil {
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}
if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
result[alias].request = pair.request
result[alias].volume = resp.Volume
result[alias].publishContext = resp.PublishContext
}
return result, nil
}
// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()
var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}
err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}
if !isRetryableClaimRPCError(err) {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use", "retry_in", backoff)
t.Reset(backoff)
}
return &resp, err
}
// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {
// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}
func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
return true
}
}
return false
}
func (c *csiHook) unpublish(pair *volumeAndRequest) error {
mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
return c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
}
// unmountWithRetry tries to unmount/unstage the volume, retrying with
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return err
case <-t.C:
}
err = c.unmountImpl(pair)
if err == nil {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
t.Reset(backoff)
}
return nil
}
// unmountImpl implements the call to the CSI plugin manager to
// unmount the volume. Each retry will write an "Unmount volume"
// NodeEvent
func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: pair.request.AttachmentMode,
AccessMode: pair.request.AccessMode,
MountOptions: pair.request.MountOptions,
}
return mounter.UnmountVolume(c.shutdownCtx,
pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts)
}
// Shutdown will get called when the client is gracefully
// stopping. Cancel our shutdown context so that we don't block client
// shutdown while in the CSI RPC retry loop.
func (c *csiHook) Shutdown() {
c.logger.Trace("shutting down hook")
c.shutdownCancelFn()
}
// Destroy will get called when an allocation gets GC'd on the client
// or when a -dev mode client is stopped. Cancel our shutdown context
// so that we don't block client shutdown while in the CSI RPC retry
// loop.
func (c *csiHook) Destroy() {
c.logger.Trace("destroying hook")
c.shutdownCancelFn()
}