diff --git a/internal/experiment/riseupvpn/riseupvpn.go b/internal/experiment/riseupvpn/riseupvpn.go index 80dbced9aa..f89ff58904 100644 --- a/internal/experiment/riseupvpn/riseupvpn.go +++ b/internal/experiment/riseupvpn/riseupvpn.go @@ -11,6 +11,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/progress" ) const ( @@ -165,8 +166,8 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { // TODO(https://github.com/ooni/probe/issues/2559): solve this problem by serving the // correct CA and the endpoints to probes using check-in v2 (aka richer input). - nullCallbacks := model.NewPrinterCallbacks(model.DiscardLogger) - for entry := range multi.CollectOverall(ctx, inputs, 0, 20, "riseupvpn", nullCallbacks) { + callbacksStage1 := progress.NewScaler(callbacks, 0, 0.25) + for entry := range multi.Collect(ctx, inputs, "riseupvpn", callbacksStage1) { tk := entry.TestKeys testkeys.AddCACertFetchTestKeys(tk) if tk.Failure != nil { @@ -203,7 +204,9 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { FailOnHTTPError: true, }}, } - for entry := range multi.CollectOverall(ctx, inputs, 1, 20, "riseupvpn", nullCallbacks) { + + callbacksStage2 := progress.NewScaler(callbacks, 0.25, 0.5) + for entry := range multi.Collect(ctx, inputs, "riseupvpn", callbacksStage2) { testkeys.UpdateProviderAPITestKeys(entry) tk := entry.TestKeys if tk.Failure != nil { @@ -216,12 +219,10 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { gateways := parseGateways(testkeys) openvpnEndpoints := generateMultiInputs(gateways, "openvpn") obfs4Endpoints := generateMultiInputs(gateways, "obfs4") - overallCount := 1 + len(inputs) + len(openvpnEndpoints) + len(obfs4Endpoints) - startCount := 1 + len(inputs) // measure openvpn in parallel - for entry := range multi.CollectOverall( - ctx, openvpnEndpoints, startCount, overallCount, "riseupvpn", callbacks) { + callbacksStage3 := progress.NewScaler(callbacks, 0.5, 0.75) + for entry := range multi.Collect(ctx, openvpnEndpoints, "riseupvpn", callbacksStage3) { testkeys.AddGatewayConnectTestKeys(entry, "openvpn") } @@ -229,9 +230,8 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { // TODO(bassosimone): when urlgetter is able to do obfs4 handshakes, here // can possibly also test for the obfs4 handshake. // See https://github.com/ooni/probe/issues/1463. - startCount += len(openvpnEndpoints) - for entry := range multi.CollectOverall( - ctx, obfs4Endpoints, startCount, overallCount, "riseupvpn", callbacks) { + callbacksStage4 := progress.NewScaler(callbacks, 0.75, 1) + for entry := range multi.Collect(ctx, obfs4Endpoints, "riseupvpn", callbacksStage4) { testkeys.AddGatewayConnectTestKeys(entry, "obfs4") } diff --git a/internal/progress/progress.go b/internal/progress/progress.go new file mode 100644 index 0000000000..f73e534a2f --- /dev/null +++ b/internal/progress/progress.go @@ -0,0 +1,44 @@ +// Package progress contains utilities to emit progress. +package progress + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// Scaler implements [model.ExperimentCallbacks] and scales progress +// as instructed through the [NewScaler] constructor. +// +// The [*Scaler] is safe to use from multiple goroutine contexts. +type Scaler struct { + cbs model.ExperimentCallbacks + offset float64 + total float64 +} + +// NewScaler constructs a new [*Scaler] using the given offset and total +// and emitting progress using the given [model.ExperimentCallbacks]. +// +// The offset is added to each progress value we emit. The total is +// used to scale the 100% to a suitable subset. +// +// For example, with offset equal to 0.1 and total equal to 0.5, the value +// 0.5 corresponds to 0.3 and the value 1 (i.e., 100%) is 0.5. +// +// This func PANICS if offset<0, offset >= total, total<=0, total>1. +func NewScaler(callbacks model.ExperimentCallbacks, offset, total float64) *Scaler { + runtimex.Assert(offset >= 0.0 && offset < total, "NewScaler: offset must be >= 0 and < total") + runtimex.Assert(total > 0.0 && total <= 1, "NewScaler: total must be > 0 and <= 1") + return &Scaler{ + cbs: callbacks, + offset: offset, + total: total, + } +} + +var _ model.ExperimentCallbacks = &Scaler{} + +// OnProgress implements model.ExperimentCallbacks. +func (s *Scaler) OnProgress(percentage float64, message string) { + s.cbs.OnProgress(s.offset+percentage*(s.total-s.offset), message) +} diff --git a/internal/progress/progress_test.go b/internal/progress/progress_test.go new file mode 100644 index 0000000000..4d05f56d92 --- /dev/null +++ b/internal/progress/progress_test.go @@ -0,0 +1,87 @@ +package progress + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/ooni/probe-cli/v3/internal/model" +) + +type capturerCallbacks struct { + value float64 +} + +var _ model.ExperimentCallbacks = &capturerCallbacks{} + +// OnProgress implements model.ExperimentCallbacks. +func (v *capturerCallbacks) OnProgress(percentage float64, message string) { + v.value = percentage +} + +func TestScaler(t *testing.T) { + // testcase is a test case run by this function. + type testcase struct { + // name is the test case name. + name string + + // offset is the offset (>=0, 0, <=1) + total float64 + + // emit is the list of progress values to emit. + emit []float64 + + // expect is the list of progress values we expect in output. + expect []float64 + } + + cases := []testcase{{ + name: "with offset==0 and total=1", + offset: 0, + total: 1, + emit: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + expect: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + }, { + name: "with offset==0 and total=0.5", + offset: 0, + total: 0.5, + emit: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + expect: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5}, + }, { + name: "with offset==0.5 and total=1", + offset: 0.5, + total: 1, + emit: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + expect: []float64{0.5, 0.6, 0.7, 0.8, 0.9, 1}, + }, { + name: "with offset==0.2 and total=0.7", + offset: 0.2, + total: 0.7, + emit: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + expect: []float64{0.2, 0.3, 0.4, 0.5, 0.6, 0.7}, + }, { + name: "with offset=0.4 and total=0.5", + offset: 0.4, + total: 0.5, + emit: []float64{0, 0.2, 0.4, 0.6, 0.8, 1}, + expect: []float64{0.4, 0.42, 0.44, 0.46, 0.48, 0.5}, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var got []float64 + for _, v := range tc.emit { + cc := &capturerCallbacks{} + wrapper := NewScaler(cc, tc.offset, tc.total) + wrapper.OnProgress(v, "") + got = append(got, cc.value) + } + if diff := cmp.Diff(tc.expect, got, cmpopts.EquateApprox(0, 0.01)); diff != "" { + t.Fatal(diff) + } + }) + } +}