-
Notifications
You must be signed in to change notification settings - Fork 2k
/
lazy_handle.go
152 lines (122 loc) · 3.65 KB
/
lazy_handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package taskrunner
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)
const (
// retrieveBackoffBaseline is the baseline time for exponential backoff while
// retrieving a handle.
retrieveBackoffBaseline = 250 * time.Millisecond
// retrieveBackoffLimit is the limit of the exponential backoff for
// retrieving a handle.
retrieveBackoffLimit = 5 * time.Second
// retrieveFailureLimit is how many times we will attempt to retrieve a
// new handle before giving up.
retrieveFailureLimit = 5
)
// retrieveHandleFn is used to retrieve the latest driver handle
type retrieveHandleFn func() *DriverHandle
// LazyHandle is used to front calls to a DriverHandle where it is expected the
// existing handle may no longer be valid because the backing plugin has
// shutdown. LazyHandle detects the plugin shutting down and retrieves a new
// handle so that the consumer does not need to worry whether the handle is to
// the latest driver instance.
type LazyHandle struct {
// retrieveHandle is used to retrieve the latest handle
retrieveHandle retrieveHandleFn
// h is the current handle and may be nil
h *DriverHandle
// shutdownCtx is used to cancel retries if the agent is shutting down
shutdownCtx context.Context
logger log.Logger
sync.Mutex
}
// NewLazyHandle takes the function to receive the latest handle and a logger
// and returns a LazyHandle
func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle {
return &LazyHandle{
retrieveHandle: fn,
h: fn(),
shutdownCtx: shutdownCtx,
logger: logger.Named("lazy_handle"),
}
}
// getHandle returns the current handle or retrieves a new one
func (l *LazyHandle) getHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()
if l.h != nil {
return l.h, nil
}
return l.refreshHandleLocked()
}
// refreshHandle retrieves a new handle
func (l *LazyHandle) refreshHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()
return l.refreshHandleLocked()
}
// refreshHandleLocked retrieves a new handle and should be called with the lock
// held. It will retry to give the client time to restart the driver and restore
// the handle.
func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) {
for i := 0; i < retrieveFailureLimit; i++ {
l.h = l.retrieveHandle()
if l.h != nil {
return l.h, nil
}
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * retrieveBackoffBaseline
if backoff > retrieveBackoffLimit {
backoff = retrieveBackoffLimit
}
l.logger.Debug("failed to retrieve handle", "backoff", backoff)
select {
case <-l.shutdownCtx.Done():
return nil, l.shutdownCtx.Err()
case <-time.After(backoff):
}
}
return nil, fmt.Errorf("no driver handle")
}
func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
h, err := l.getHandle()
if err != nil {
return nil, 0, err
}
// Only retry once
first := true
TRY:
out, c, err := h.Exec(timeout, cmd, args)
if err == bstructs.ErrPluginShutdown && first {
first = false
h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}
return out, c, err
}
func (l *LazyHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
h, err := l.getHandle()
if err != nil {
return nil, err
}
// Only retry once
first := true
TRY:
out, err := h.Stats(ctx, interval)
if err == bstructs.ErrPluginShutdown && first {
first = false
h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}
return out, err
}