forked from ooni/probe-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
taskrunner.go
350 lines (328 loc) · 11.3 KB
/
taskrunner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package oonimkall
import (
"context"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)
// runnerForTask runs a specific task
type runnerForTask struct {
emitter *taskEmitterWrapper
kvStoreBuilder taskKVStoreFSBuilder
sessionBuilder taskSessionBuilder
settings *settings
}
var _ taskRunner = &runnerForTask{}
// newRunner creates a new task runner
func newRunner(settings *settings, emitter taskEmitter) *runnerForTask {
return &runnerForTask{
emitter: &taskEmitterWrapper{emitter},
kvStoreBuilder: &taskKVStoreFSBuilderEngine{},
sessionBuilder: &taskSessionBuilderEngine{},
settings: settings,
}
}
// failureInvalidVersion is the failure returned when Version is invalid
const failureInvalidVersion = "invalid Settings.Version number"
func (r *runnerForTask) hasUnsupportedSettings() bool {
if r.settings.Version < taskABIVersion {
r.emitter.EmitFailureStartup(failureInvalidVersion)
return true
}
return false
}
func (r *runnerForTask) newsession(ctx context.Context, logger model.Logger) (taskSession, error) {
kvstore, err := r.kvStoreBuilder.NewFS(r.settings.StateDir)
if err != nil {
return nil, err
}
var proxyURL *url.URL
if r.settings.Proxy != "" {
var err error
proxyURL, err = url.Parse(r.settings.Proxy)
if err != nil {
return nil, err
}
}
config := engine.SessionConfig{
KVStore: kvstore,
Logger: logger,
ProxyURL: proxyURL,
SoftwareName: r.settings.Options.SoftwareName,
SoftwareVersion: r.settings.Options.SoftwareVersion,
TempDir: r.settings.TempDir,
TunnelDir: r.settings.TunnelDir,
}
if r.settings.Options.ProbeServicesBaseURL != "" {
config.AvailableProbeServices = []model.OOAPIService{{
Type: "https",
Address: r.settings.Options.ProbeServicesBaseURL,
}}
}
return r.sessionBuilder.NewSession(ctx, config)
}
// contextForExperiment ensurs that for measuring we only use an
// interruptible context when we can interrupt the experiment
func (r *runnerForTask) contextForExperiment(
ctx context.Context, builder taskExperimentBuilder,
) context.Context {
if builder.Interruptible() {
return ctx
}
return context.Background()
}
type runnerCallbacks struct {
emitter taskEmitter
}
func (cb *runnerCallbacks) OnProgress(percentage float64, message string) {
cb.emitter.Emit(eventTypeStatusProgress, eventStatusProgress{
Percentage: 0.4 + (percentage * 0.6), // open report is 40%
Message: message,
})
}
// Run runs the runner until completion. The context argument controls
// when to stop when processing multiple inputs, as well as when to stop
// experiments explicitly marked as interruptible.
func (r *runnerForTask) Run(rootCtx context.Context) {
// Implementation note: this function uses these contexts:
//
// - rootCtx is the root context and is controlled by the user;
//
// - measCtx derives from rootCtx and is possibly tied to the
// maximum runtime and is used to choose when to stop measuring;
//
// - submitCtx is like measCtx but, in case we're using a max
// runtime, is given more time to finish submitting.
//
// See https://github.com/ooni/probe/issues/2037.
var logger model.Logger = newTaskLogger(r.emitter, r.settings.LogLevel)
r.emitter.Emit(eventTypeStatusQueued, eventEmpty{})
if r.hasUnsupportedSettings() {
// event failureStartup already emitted
return
}
r.emitter.Emit(eventTypeStatusStarted, eventEmpty{})
sess, err := r.newsession(rootCtx, logger)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
endEvent := new(eventStatusEnd)
defer func() {
sess.Close()
r.emitter.Emit(eventTypeStatusEnd, endEvent)
}()
builder, err := sess.NewExperimentBuilderByName(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
logger.Info("Looking up OONI backends... please, be patient")
if err := sess.MaybeLookupBackendsContext(rootCtx); err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}
r.emitter.EmitStatusProgress(0.1, "contacted bouncer")
logger.Info("Looking up your location... please, be patient")
if err := sess.MaybeLookupLocationContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureIPLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureASNLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureCCLookup, err.Error())
r.emitter.EmitFailureGeneric(eventTypeFailureResolverLookup, err.Error())
return
}
r.emitter.EmitStatusProgress(0.2, "geoip lookup")
r.emitter.EmitStatusProgress(0.3, "resolver lookup")
r.emitter.Emit(eventTypeStatusGeoIPLookup, eventStatusGeoIPLookup{
ProbeIP: sess.ProbeIP(),
ProbeASN: sess.ProbeASNString(),
ProbeCC: sess.ProbeCC(),
ProbeNetworkName: sess.ProbeNetworkName(),
})
r.emitter.Emit(eventTypeStatusResolverLookup, eventStatusResolverLookup{
ResolverASN: sess.ResolverASNString(),
ResolverIP: sess.ResolverIP(),
ResolverNetworkName: sess.ResolverNetworkName(),
})
builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})
// TODO(bassosimone): replace the following code with an
// invocation of the InputLoader. Since I am making these
// changes before a release and I've already changed the
// code a lot, I'd rather avoid changing it even more,
// for the following reason:
//
// If we add an call InputLoader here, this code will
// magically invoke check-in for InputOrQueryBackend,
// which we need to make sure the app can handle. This is
// the main reason why now I don't fill like properly
// fixing this code and use InputLoader: too much work
// in too little time, so mistakes more likely.
//
// In fact, our current app assumes that it's its
// responsibility to load the inputs, not oonimkall's.
switch builder.InputPolicy() {
case model.InputOrQueryBackend, model.InputStrictlyRequired:
if len(r.settings.Inputs) <= 0 {
r.emitter.EmitFailureStartup("no input provided")
return
}
case model.InputOrStaticDefault:
if len(r.settings.Inputs) <= 0 {
inputs, err := engine.StaticBareInputForExperiment(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup("no default static input for this experiment")
return
}
r.settings.Inputs = inputs
}
case model.InputOptional:
if len(r.settings.Inputs) <= 0 {
r.settings.Inputs = append(r.settings.Inputs, "")
}
default: // treat this case as engine.InputNone.
if len(r.settings.Inputs) > 0 {
r.emitter.EmitFailureStartup("experiment does not accept input")
return
}
r.settings.Inputs = append(r.settings.Inputs, "")
}
experiment := builder.NewExperimentInstance()
defer func() {
endEvent.DownloadedKB = experiment.KibiBytesReceived()
endEvent.UploadedKB = experiment.KibiBytesSent()
}()
if !r.settings.Options.NoCollector {
logger.Info("Opening report... please, be patient")
if err := experiment.OpenReportContext(rootCtx); err != nil {
r.emitter.EmitFailureGeneric(eventTypeFailureReportCreate, err.Error())
return
}
r.emitter.EmitStatusProgress(0.4, "open report")
r.emitter.Emit(eventTypeStatusReportCreate, eventStatusReportGeneric{
ReportID: experiment.ReportID(),
})
}
measCtx, measCancel := context.WithCancel(rootCtx)
defer measCancel()
submitCtx, submitCancel := context.WithCancel(rootCtx)
defer submitCancel()
// This deviates a little bit from measurement-kit, for which
// a zero timeout is actually valid. Since it does not make much
// sense, here we're changing the behaviour.
//
// See https://github.com/measurement-kit/measurement-kit/issues/1922
if r.settings.Options.MaxRuntime > 0 {
// We want to honour max_runtime only when we're running an
// experiment that clearly wants specific input. We could refine
// this policy in the future, but for now this covers in a
// reasonable way web connectivity, so we should be ok.
switch builder.InputPolicy() {
case model.InputOrQueryBackend, model.InputStrictlyRequired:
var (
cancelMeas context.CancelFunc
cancelSubmit context.CancelFunc
)
// We give the context used for submitting extra time so that
// it's possible to submit the last measurement.
//
// See https://github.com/ooni/probe/issues/2037 for more info.
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
defer cancelMeas()
maxRuntime += 30 * time.Second
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
defer cancelSubmit()
}
}
inputCount := len(r.settings.Inputs)
start := time.Now()
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
for idx, input := range r.settings.Inputs {
if measCtx.Err() != nil {
break
}
logger.Infof("Starting measurement with index %d", idx)
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
if input != "" && inputCount > 0 {
var percentage float64
if r.settings.Options.MaxRuntime > 0 {
now := time.Now()
percentage = (now.Sub(start).Seconds()/eta.Sub(start).Seconds())*0.6 + 0.4
} else {
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
}
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
"processing %s", input,
))
}
m, err := experiment.MeasureWithContext(
r.contextForExperiment(measCtx, builder),
input,
)
if builder.Interruptible() && measCtx.Err() != nil {
// We want to stop here only if interruptible otherwise we want to
// submit measurement and stop at beginning of next iteration
break
}
if err != nil {
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
Failure: err.Error(),
Idx: int64(idx),
Input: input,
})
// Historical note: here we used to fallthrough but, since we have
// implemented async measurements, the case where there is an error
// and we also have a valid measurement cant't happen anymore. So,
// now the only valid strategy here is to continue.
continue
}
m.AddAnnotations(r.settings.Annotations)
data, err := json.Marshal(m)
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
JSONStr: string(data),
})
if !r.settings.Options.NoCollector {
logger.Info("Submitting measurement... please, be patient")
err := experiment.SubmitAndUpdateMeasurementContext(submitCtx, m)
warnOnFailure(logger, "cannot submit measurement", err)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
JSONStr: string(data),
Failure: measurementSubmissionFailure(err),
})
}
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
})
}
}
func warnOnFailure(logger model.Logger, message string, err error) {
if err != nil {
logger.Warnf("%s: %s (%+v)", message, err.Error(), err)
}
}
func measurementSubmissionEventName(err error) string {
if err != nil {
return eventTypeFailureMeasurementSubmission
}
return eventTypeStatusMeasurementSubmission
}
func measurementSubmissionFailure(err error) string {
if err != nil {
return err.Error()
}
return ""
}