Skip to content

Commit

Permalink
feat(dsl): implement progress
Browse files Browse the repository at this point in the history
Noticed while working on ooni/probe#2502
  • Loading branch information
bassosimone committed Jul 14, 2023
1 parent 16d052b commit 1d32617
Show file tree
Hide file tree
Showing 14 changed files with 2,044 additions and 1,300 deletions.
3 changes: 3 additions & 0 deletions pkg/dsl/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func NewASTLoader() *ASTLoader {
// parallel.go
al.RegisterCustomLoaderRule(&runStagesInParallelLoader{})

// progress.go
al.RegisterCustomLoaderRule(&wrapWithProgressLoader{})

// quichandshake.go
al.RegisterCustomLoaderRule(&quicHandshakeLoader{})

Expand Down
8 changes: 4 additions & 4 deletions pkg/dsl/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Example_internalDSL() {

// Create a measurement runtime using measurexlite as the underlying
// measurement library such that we also collect observations.
rtx := dsl.NewMeasurexliteRuntime(log.Log, &dsl.NullMetrics{}, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, &dsl.NullMetrics{}, &dsl.NullProgressMeter{}, time.Now())

// Create the void input for the pipeline.
input := dsl.NewValue(&dsl.Void{})
Expand Down Expand Up @@ -166,7 +166,7 @@ func Example_externalDSL() {

// Create a measurement runtime using measurexlite as the underlying
// measurement library such that we also collect observations.
rtx := dsl.NewMeasurexliteRuntime(log.Log, &dsl.NullMetrics{}, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, &dsl.NullMetrics{}, &dsl.NullProgressMeter{}, time.Now())

// Create the void input for the pipeline. We need to cast the input to
// a generic Maybe because there's dynamic type checking when running an
Expand Down Expand Up @@ -211,7 +211,7 @@ func Example_singleEndpointInternalDSL() {
metrics := dsl.NewAccountingMetrics()

// create a measurement runtime
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, &dsl.NullProgressMeter{}, time.Now())

// run the measurement pipeline
_ = pipeline.Run(context.Background(), rtx, dsl.NewValue(&dsl.Void{}))
Expand Down Expand Up @@ -256,7 +256,7 @@ func Example_singleEndpointExternalDSL() {
metrics := dsl.NewAccountingMetrics()

// create a measurement runtime
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, &dsl.NullProgressMeter{}, time.Now())

// run the measurement pipeline
_ = runnable.Run(context.Background(), rtx, dsl.NewValue(&dsl.Void{}).AsGeneric())
Expand Down
16 changes: 15 additions & 1 deletion pkg/dsl/measurexlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type MeasurexliteRuntime struct {
// metrics contains the metrics.
metrics Metrics

// progress is the ProgressMeter to use.
progress ProgressMeter

// runtime is the MinimalRuntime we compose with.
runtime *MinimalRuntime

Expand All @@ -25,9 +28,15 @@ type MeasurexliteRuntime struct {
}

// NewMeasurexliteRuntime creates a new [MeasurexliteRuntime].
func NewMeasurexliteRuntime(logger model.Logger, metrics Metrics, zeroTime time.Time) *MeasurexliteRuntime {
func NewMeasurexliteRuntime(
logger model.Logger,
metrics Metrics,
progress ProgressMeter,
zeroTime time.Time,
) *MeasurexliteRuntime {
return &MeasurexliteRuntime{
metrics: metrics,
progress: progress,
runtime: NewMinimalRuntime(logger),
zeroTime: zeroTime,
}
Expand All @@ -40,6 +49,11 @@ func (r *MeasurexliteRuntime) Close() error {
return r.runtime.Close()
}

// IncrementProgress implements Runtime.
func (r *MeasurexliteRuntime) IncrementProgress(delta float64) {
r.progress.IncrementProgress(delta)
}

// Metrics implements Runtime.
func (r *MeasurexliteRuntime) Metrics() Metrics {
return r.metrics
Expand Down
3 changes: 2 additions & 1 deletion pkg/dsl/measurexlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func TestMeasurexliteHTTPIncludeResponseBodySnapshot(t *testing.T) {
// define the function to run the measurement
measure := func(options ...HTTPTransactionOption) (Maybe[*HTTPResponse], *Observations) {
pipeline := makePipeline(options...)
rtx := NewMeasurexliteRuntime(model.DiscardLogger, &NullMetrics{}, time.Now())
meter := &NullProgressMeter{}
rtx := NewMeasurexliteRuntime(model.DiscardLogger, &NullMetrics{}, meter, time.Now())
input := NewValue(&Void{})
output := pipeline.Run(context.Background(), rtx, input)
observations := ReduceObservations(rtx.ExtractObservations()...)
Expand Down
127 changes: 127 additions & 0 deletions pkg/dsl/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package dsl

import (
"context"
"encoding/json"
"sync"

"github.com/ooni/probe-engine/pkg/model"
)

// ProgressMeter tracks progress.
type ProgressMeter interface {
// IncrementProgress increments the progress meter by adding the given delta
// to the current progress meter value. The progress meter value is a float
// number where 0 means beginning and 1.0 means we are done.
IncrementProgress(delta float64)
}

// NullProgressMeter is a [ProgressMeter] that does nothing. The zero
// value of this struct is ready to use.
type NullProgressMeter struct{}

var _ ProgressMeter = &NullProgressMeter{}

// IncrementProgress implements ProgressMeter.
func (pm *NullProgressMeter) IncrementProgress(value float64) {
// nothing
}

// ProgressMeterExperimentCallbacks wraps [model.ExperimentCallbacks] and
// implements [ProgressMeter]. The zero value is not ready to use; you should
// construct using the [NewProgressMeterExperimentCallbacks] factory.
type ProgressMeterExperimentCallbacks struct {
callbacks model.ExperimentCallbacks
mu sync.Mutex
total float64
}

// NewProgressMeterExperimentCallbacks constructs a new [ProgressMeterExperimentCallbacks].
func NewProgressMeterExperimentCallbacks(cb model.ExperimentCallbacks) *ProgressMeterExperimentCallbacks {
return &ProgressMeterExperimentCallbacks{
callbacks: cb,
mu: sync.Mutex{},
total: 0,
}
}

var _ ProgressMeter = &ProgressMeterExperimentCallbacks{}

// IncrementProgress implements ProgressMeter.
func (pm *ProgressMeterExperimentCallbacks) IncrementProgress(delta float64) {
pm.mu.Lock()
if delta >= 0 {
pm.total += delta
if pm.total > 1.0 {
pm.total = 1.0
}
}
total := pm.total
pm.mu.Unlock()
pm.callbacks.OnProgress(total, "")
}

// WrapWithProgress wraps a list of stages such that each stage increments the
// progress of running a measurement by an equal contribution.
func WrapWithProgress(input ...Stage[*Void, *Void]) (output []Stage[*Void, *Void]) {
var delta float64
if len(input) > 0 {
delta = 1 / float64(len(input))
}
for _, stage := range input {
output = append(output, &wrapWithProgressStage{delta, stage})
}
return output
}

type wrapWithProgressStage struct {
delta float64
stage Stage[*Void, *Void]
}

const wrapWithProgressStageName = "wrap_with_progress"

type wrapWithProgressStageArguments struct {
Delta float64 `json:"delta"`
}

// ASTNode implements Stage.
func (sx *wrapWithProgressStage) ASTNode() *SerializableASTNode {
return &SerializableASTNode{
StageName: wrapWithProgressStageName,
Arguments: &wrapWithProgressStageArguments{sx.delta},
Children: []*SerializableASTNode{sx.stage.ASTNode()},
}
}

type wrapWithProgressLoader struct{}

// Load implements ASTLoaderRule.
func (*wrapWithProgressLoader) Load(loader *ASTLoader, node *LoadableASTNode) (RunnableASTNode, error) {
var config wrapWithProgressStageArguments
if err := json.Unmarshal(node.Arguments, &config); err != nil {
return nil, err
}
runnables, err := loader.LoadChildren(node)
if err != nil {
return nil, err
}
if len(runnables) != 1 {
return nil, ErrInvalidNumberOfChildren
}
runnables0 := &RunnableASTNodeStage[*Void, *Void]{runnables[0]}
stage := &wrapWithProgressStage{config.Delta, runnables0}
return &StageRunnableASTNode[*Void, *Void]{stage}, nil
}

// StageName implements ASTLoaderRule.
func (*wrapWithProgressLoader) StageName() string {
return wrapWithProgressStageName
}

// Run implements Stage.
func (sx *wrapWithProgressStage) Run(ctx context.Context, rtx Runtime, input Maybe[*Void]) Maybe[*Void] {
output := sx.stage.Run(ctx, rtx, input)
rtx.IncrementProgress(sx.delta)
return output
}
2 changes: 1 addition & 1 deletion pkg/dsl/qa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func qaNewEnvironment() *netemx.QAEnv {

func qaRunNode(metrics dsl.Metrics, runnable dsl.RunnableASTNode) (*dsl.Observations, error) {
input := dsl.NewValue(&dsl.Void{}).AsGeneric()
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, &dsl.NullProgressMeter{}, time.Now())
if err := dsl.Try(runnable.Run(context.Background(), rtx, input)); err != nil {
return nil, err
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/dsl/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type Runtime interface {
// Close closes all the closers tracker by the runtime.
Close() error

// IncrementProgress increments the progress meter by adding the given delta
// to the current progress meter value. The progress meter value is a float
// number where 0 means beginning and 1.0 means we are done.
IncrementProgress(delta float64)

// Metrics returns the metrics to use.
Metrics() Metrics

Expand Down Expand Up @@ -82,7 +87,7 @@ func (r *MinimalRuntime) Close() error {
return nil
}

// ExtractObservations implements Trace.
// ExtractObservations implements Runtime.
func (r *MinimalRuntime) ExtractObservations() []*Observations {
defer r.mu.Unlock()
r.mu.Lock()
Expand All @@ -91,6 +96,11 @@ func (r *MinimalRuntime) ExtractObservations() []*Observations {
return out
}

// IncrementProgress implements Runtime.
func (r *MinimalRuntime) IncrementProgress(delta float64) {
// nothing
}

// Metrics implements Runtime.
func (r *MinimalRuntime) Metrics() Metrics {
return defaultNullMetrics
Expand Down
3 changes: 2 additions & 1 deletion pkg/experiment/fbmessenger/dslcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "github.com/ooni/2023-05-richer-input/pkg/dsl"

// DSLToplevelFunc generates the Facebook Messenger measurement pipeline.
func DSLToplevelFunc(tk *TestKeys) dsl.Stage[*dsl.Void, *dsl.Void] {
return dsl.RunStagesInParallel(
stages := dsl.WrapWithProgress(

// stun
dsl.Compose4(
Expand Down Expand Up @@ -130,4 +130,5 @@ func DSLToplevelFunc(tk *TestKeys) dsl.Stage[*dsl.Void, *dsl.Void] {
),
),
)
return dsl.RunStagesInParallel(stages...)
}
4 changes: 3 additions & 1 deletion pkg/experiment/fbmessenger/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ func (m *Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
}

// create the DSL runtime
meter := dsl.NewProgressMeterExperimentCallbacks(args.Callbacks)
rtx := dsl.NewMeasurexliteRuntime(
args.Session.Logger(), &dsl.NullMetrics{}, args.Measurement.MeasurementStartTimeSaved)
args.Session.Logger(), &dsl.NullMetrics{}, meter,
args.Measurement.MeasurementStartTimeSaved)
defer rtx.Close()

// evaluate the pipeline and handle exceptions
Expand Down
8 changes: 4 additions & 4 deletions pkg/experiment/riseupvpn/riseupvpn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (m *Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
// TODO(bassosimone): both fbmessenger and riseupvpn lack
//
// 1. an explicit mechanism to report the bytes sent and received, but the
// implicit context-based mechanism probably works;
//
// 2. a DSL-based mechanism to increment the test progress percentage.
// implicit context-based mechanism probably works.

// create the DSL runtime
progress := dsl.NewProgressMeterExperimentCallbacks(args.Callbacks)
rtx := dsl.NewMeasurexliteRuntime(
args.Session.Logger(), &dsl.NullMetrics{}, args.Measurement.MeasurementStartTimeSaved)
args.Session.Logger(), &dsl.NullMetrics{}, progress,
args.Measurement.MeasurementStartTimeSaved)
defer rtx.Close()

// evaluate the pipeline and handle exceptions
Expand Down
4 changes: 3 additions & 1 deletion pkg/ooniprobe/runner/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var _ model.ExperimentCallbacks = &progressEmitterNettest{}
func (pe *progressEmitterNettest) OnProgress(progress float64, message string) {
// the view only supports setting the progress, so use the logger
// to make sure the message is not lost
pe.logger.Info(message)
if message != "" {
pe.logger.Info(message)
}
pe.view.UpdateProgressBarValueWithinRange(progress)
}
4 changes: 2 additions & 2 deletions pkg/x/cmd/riseupvpn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// isTCPGatewayAccessible returns whether a gateways is accessible.
func isTCPGatewayAccessible(stage dsl.Stage[*dsl.Void, *dsl.Void]) bool {
metrics := dsl.NewAccountingMetrics()
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, time.Now())
rtx := dsl.NewMeasurexliteRuntime(log.Log, metrics, &dsl.NullProgressMeter{}, time.Now())
input := dsl.NewValue(&dsl.Void{})
ctx := context.Background()
runtimex.Try0(dsl.Try(stage.Run(ctx, rtx, input)))
Expand Down Expand Up @@ -84,7 +84,7 @@ func mustGenerateDSL(eipService *apiEIPService, rootCA string) dsl.Stage[*dsl.Vo
stages = append(stages, dslRuleFetchGeoServiceURL(rootCA))

// return the composed pipeline
return dsl.RunStagesInParallel(stages...)
return dsl.RunStagesInParallel(dsl.WrapWithProgress(stages...)...)
}

func main() {
Expand Down
Loading

0 comments on commit 1d32617

Please sign in to comment.