-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathexecutors.go
346 lines (302 loc) · 12.3 KB
/
executors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2019 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package lib
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui/pb"
)
// TODO: remove globals and use some type of explicit dependency injection?
//nolint:gochecknoglobals
var (
executorConfigTypesMutex sync.RWMutex
executorConfigConstructors = make(map[string]ExecutorConfigConstructor)
)
// ExecutionStep is used by different executors to specify the planned number of
// VUs they will need at a particular time. The times are relative to their
// StartTime, i.e. they don't take into account the specific starting time of
// the executor, as that will be considered by the external execution executor
// separately.
//
// A slice [{t1, v1}, {t2, v2}, {t3, v3}, ..., {tn, vn}] of execution steps
// means that an executor will need 0 VUs until t1, it will need v1 number of
// VUs from time t1 until t2, need v2 number of VUs from time t2 to t3, and so
// on. t1 is usually 0, tn is usually the same as GetMaxDuration() and vn is
// usually 0.
//
// Keep in mind that t(i) may be exactly equal to t(i+i), when there's an abrupt
// transition in the number of VUs required by an executor. For example, the
// ramping-vus executor may have 0-duration stages, or it may scale up
// VUs in its last stage right until the end. These immediate transitions cannot
// be ignored, since the gracefulStop/gracefulRampDown options potentially allow
// any started iterations to finish.
//
// []ExecutionStep is also used by the ScenarioConfigs, to represent the
// amount of needed VUs among all executors, during the whole execution of a
// test script. In that context, each executor's StartTime is accounted for and
// included in the offsets.
type ExecutionStep struct {
TimeOffset time.Duration
PlannedVUs uint64
MaxUnplannedVUs uint64
}
// TODO: make []ExecutionStep or []ExecutorConfig their own type?
// ExecutorConfig is an interface that should be implemented by all executor config types
type ExecutorConfig interface {
Validate() []error
GetName() string
GetType() string
GetStartTime() time.Duration
GetGracefulStop() time.Duration
// This is used to validate whether a particular script can run in the cloud
// or, in the future, in the native k6 distributed execution. Currently only
// the externally-controlled executor should return false.
IsDistributable() bool
GetEnv() map[string]string
// Allows us to get the non-default function the executor should run, if it
// has been specified.
//
// TODO: use interface{} so plain http requests can be specified?
GetExec() string
GetTags() map[string]string
// Calculates the VU requirements in different stages of the executor's
// execution, including any extensions caused by waiting for iterations to
// finish with graceful stops or ramp-downs.
GetExecutionRequirements(*ExecutionTuple) []ExecutionStep
// Return a human-readable description of the executor
GetDescription(*ExecutionTuple) string
NewExecutor(*ExecutionState, *logrus.Entry) (Executor, error)
// HasWork reports whether there is any work for the executor to do with a given segment.
HasWork(*ExecutionTuple) bool
}
// InitVUFunc is just a shorthand so we don't have to type the function
// signature every time.
type InitVUFunc func(context.Context, *logrus.Entry) (InitializedVU, error)
// Executor is the interface all executors should implement
type Executor interface {
GetConfig() ExecutorConfig
GetProgress() *pb.ProgressBar
GetLogger() *logrus.Entry
Init(ctx context.Context) error
Run(ctx context.Context, engineOut chan<- stats.SampleContainer) error
}
// PausableExecutor should be implemented by the executors that can be paused
// and resumed in the middle of the test execution. Currently, only the
// externally controlled executor implements it.
type PausableExecutor interface {
SetPaused(bool) error
}
// LiveUpdatableExecutor should be implemented for the executors whose
// configuration can be modified in the middle of the test execution. Currently,
// only the manual execution executor implements it.
type LiveUpdatableExecutor interface {
UpdateConfig(ctx context.Context, newConfig interface{}) error
}
// ExecutorConfigConstructor is a simple function that returns a concrete
// Config instance with the specified name and all default values correctly
// initialized
type ExecutorConfigConstructor func(name string, rawJSON []byte) (ExecutorConfig, error)
// RegisterExecutorConfigType adds the supplied ExecutorConfigConstructor as
// the constructor for its type in the configConstructors map, in a thread-safe
// manner
func RegisterExecutorConfigType(configType string, constructor ExecutorConfigConstructor) {
executorConfigTypesMutex.Lock()
defer executorConfigTypesMutex.Unlock()
if constructor == nil {
panic("executor configs: constructor is nil")
}
if _, configTypeExists := executorConfigConstructors[configType]; configTypeExists {
panic("executor configs: lib.RegisterExecutorConfigType called twice for " + configType)
}
executorConfigConstructors[configType] = constructor
}
// ScenarioConfigs can contain mixed executor config types
type ScenarioConfigs map[string]ExecutorConfig
// UnmarshalJSON implements the json.Unmarshaler interface in a two-step manner,
// creating the correct type of configs based on the `type` property.
func (scs *ScenarioConfigs) UnmarshalJSON(data []byte) error {
if len(data) == 0 {
return nil
}
if len(data) == 4 && string(data) == "null" {
return nil
}
// TODO: use a more sophisticated combination of dec.Token() and dec.More(),
// which would allow us to support both arrays and maps for this config?
var protoConfigs map[string]protoExecutorConfig
if err := StrictJSONUnmarshal(data, &protoConfigs); err != nil {
return err
}
result := make(ScenarioConfigs, len(protoConfigs))
for k, v := range protoConfigs {
if v.executorType == "" {
return fmt.Errorf("scenario '%s' doesn't have a specified executor type", k)
}
config, err := GetParsedExecutorConfig(k, v.executorType, v.rawJSON)
if err != nil {
return err
}
result[k] = config
}
*scs = result
return nil
}
// Validate checks if all of the specified executor options make sense
func (scs ScenarioConfigs) Validate() (errors []error) {
for name, exec := range scs {
if execErr := exec.Validate(); len(execErr) != 0 {
errors = append(errors,
fmt.Errorf("scenario %s has configuration errors: %s", name, ConcatErrors(execErr, ", ")))
}
}
return errors
}
// GetSortedConfigs returns a slice with the executor configurations,
// sorted in a consistent and predictable manner. It is useful when we want or
// have to avoid using maps with string keys (and tons of string lookups in
// them) and avoid the unpredictable iterations over Go maps. Slices allow us
// constant-time lookups and ordered iterations.
//
// The configs in the returned slice will be sorted by their start times in an
// ascending order, and alphabetically by their names (which are unique) if
// there are ties.
func (scs ScenarioConfigs) GetSortedConfigs() []ExecutorConfig {
configs := make([]ExecutorConfig, len(scs))
// Populate the configs slice with sorted executor configs
i := 0
for _, config := range scs {
configs[i] = config // populate the slice in an unordered manner
i++
}
sort.Slice(configs, func(a, b int) bool { // sort by (start time, name)
switch {
case configs[a].GetStartTime() < configs[b].GetStartTime():
return true
case configs[a].GetStartTime() == configs[b].GetStartTime():
return strings.Compare(configs[a].GetName(), configs[b].GetName()) < 0
default:
return false
}
})
return configs
}
// GetFullExecutionRequirements combines the execution requirements from all of
// the configured executors. It takes into account their start times and their
// individual VU requirements and calculates the total VU requirements for each
// moment in the test execution.
func (scs ScenarioConfigs) GetFullExecutionRequirements(et *ExecutionTuple) []ExecutionStep {
sortedConfigs := scs.GetSortedConfigs()
// Combine the steps and requirements from all different executors, and
// sort them by their time offset, counting the executors' startTimes as
// well.
type trackedStep struct {
ExecutionStep
configID int
}
trackedSteps := []trackedStep{}
for configID, config := range sortedConfigs { // orderly iteration over a slice
configStartTime := config.GetStartTime()
configSteps := config.GetExecutionRequirements(et)
for _, cs := range configSteps {
cs.TimeOffset += configStartTime // add the executor start time to the step time offset
trackedSteps = append(trackedSteps, trackedStep{cs, configID})
}
}
// Sort by (time offset, config id). It's important that we use stable
// sorting algorithm, since there could be steps with the same time from
// the same executor and their order is important.
sort.SliceStable(trackedSteps, func(a, b int) bool {
if trackedSteps[a].TimeOffset == trackedSteps[b].TimeOffset {
return trackedSteps[a].configID < trackedSteps[b].configID
}
return trackedSteps[a].TimeOffset < trackedSteps[b].TimeOffset
})
// Go through all of the sorted steps from all of the executors, and
// build a new list of execution steps that consolidates all of their
// requirements. If multiple executors have an execution step at exactly
// the same time offset, they will be combined into a single new execution
// step with the sum of the values from the previous ones.
currentTimeOffset := time.Duration(0)
currentPlannedVUs := make([]uint64, len(scs))
currentMaxUnplannedVUs := make([]uint64, len(scs))
sum := func(data []uint64) (result uint64) { // sigh...
for _, val := range data {
result += val
}
return result
}
consolidatedSteps := []ExecutionStep{}
addCurrentStepIfDifferent := func() {
newPlannedVUs := sum(currentPlannedVUs)
newMaxUnplannedVUs := sum(currentMaxUnplannedVUs)
stepsLen := len(consolidatedSteps)
if stepsLen == 0 ||
consolidatedSteps[stepsLen-1].PlannedVUs != newPlannedVUs ||
consolidatedSteps[stepsLen-1].MaxUnplannedVUs != newMaxUnplannedVUs {
consolidatedSteps = append(consolidatedSteps, ExecutionStep{
TimeOffset: currentTimeOffset,
PlannedVUs: newPlannedVUs,
MaxUnplannedVUs: newMaxUnplannedVUs,
})
}
}
for _, step := range trackedSteps {
// TODO: optimize by skipping some steps
// If the time offset is different, create a new step with the current values
currentTimeOffset = step.TimeOffset
currentPlannedVUs[step.configID] = step.PlannedVUs
currentMaxUnplannedVUs[step.configID] = step.MaxUnplannedVUs
addCurrentStepIfDifferent()
}
return consolidatedSteps
}
// GetParsedExecutorConfig returns a struct instance corresponding to the supplied
// config type. It will be fully initialized - with both the default values of
// the type, as well as with whatever the user had specified in the JSON
func GetParsedExecutorConfig(name, configType string, rawJSON []byte) (result ExecutorConfig, err error) {
executorConfigTypesMutex.Lock()
defer executorConfigTypesMutex.Unlock()
constructor, exists := executorConfigConstructors[configType]
if !exists {
return nil, fmt.Errorf("unknown executor type '%s'", configType)
}
return constructor(name, rawJSON)
}
type protoExecutorConfig struct {
executorType string
rawJSON json.RawMessage
}
// UnmarshalJSON unmarshals the base config (to get the type), but it also
// stores the unprocessed JSON so we can parse the full config in the next step
func (pc *protoExecutorConfig) UnmarshalJSON(b []byte) error {
var tmp struct {
ExecutorType string `json:"executor"`
}
err := json.Unmarshal(b, &tmp)
*pc = protoExecutorConfig{tmp.ExecutorType, b}
return err
}