From 6c25fe5f2ca9a5b4ed2b99e7d7a80053d523aabd Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 29 Mar 2023 16:14:19 +0000 Subject: [PATCH 01/11] feat: minimal OONI engine --- internal/cmd/exp/main.go | 126 +++++++ internal/engine/experiment.go | 2 +- internal/engine/session.go | 2 +- internal/miniengine/callbacks.go | 27 ++ internal/miniengine/doc.go | 1 + internal/miniengine/event.go | 29 ++ internal/miniengine/location.go | 35 ++ internal/miniengine/logger.go | 78 +++++ internal/miniengine/session.go | 495 ++++++++++++++++++++++++++++ internal/miniengine/task.go | 48 +++ internal/miniengine/void.go | 4 + internal/probeservices/collector.go | 37 ++- pkg/oonimkall/tasklogger.go | 12 +- 13 files changed, 882 insertions(+), 14 deletions(-) create mode 100644 internal/cmd/exp/main.go create mode 100644 internal/miniengine/callbacks.go create mode 100644 internal/miniengine/doc.go create mode 100644 internal/miniengine/event.go create mode 100644 internal/miniengine/location.go create mode 100644 internal/miniengine/logger.go create mode 100644 internal/miniengine/session.go create mode 100644 internal/miniengine/task.go create mode 100644 internal/miniengine/void.go diff --git a/internal/cmd/exp/main.go b/internal/cmd/exp/main.go new file mode 100644 index 0000000000..c253d00e6b --- /dev/null +++ b/internal/cmd/exp/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "path/filepath" + + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/platform" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +type awaitableTask interface { + Done() <-chan any + Events() <-chan *miniengine.Event +} + +func awaitTask(task awaitableTask) { + for { + select { + case <-task.Done(): + return + case ev := <-task.Events(): + switch ev.EventType { + case miniengine.EventTypeProgress: + log.Infof("PROGRESS %f %s", ev.Progress, ev.Message) + case miniengine.EventTypeInfo: + log.Infof("%s", ev.Message) + case miniengine.EventTypeWarning: + log.Warnf("%s", ev.Message) + case miniengine.EventTypeDebug: + log.Debugf("%s", ev.Message) + } + } + } +} + +func main() { + log.SetLevel(log.DebugLevel) + ctx := context.Background() + + // create session config + sessionConfig := &miniengine.SessionConfig{ + BackendURL: "", + ProxyURL: "", + SnowflakeRendezvousMethod: "", + SoftwareName: "miniooni", + SoftwareVersion: "0.1.0-dev", + StateDir: filepath.Join("x", "state"), + TempDir: filepath.Join("x", "tmp"), + TorArgs: []string{}, + TorBinary: "", + TunnelDir: filepath.Join("x", "tunnel"), + Verbose: false, + } + + // create session + sess := runtimex.Try1(miniengine.NewSession(sessionConfig)) + defer sess.Close() + + // bootstrap the session + bootstrapTask := sess.Bootstrap(ctx) + awaitTask(bootstrapTask) + _ = runtimex.Try1(bootstrapTask.Result()) + + // geolocate the probe + locationTask := sess.Geolocate(ctx) + awaitTask(locationTask) + location := runtimex.Try1(locationTask.Result()) + log.Infof("%+v", location) + + // TODO(bassosimone): we want a factory to construct a new OOAPICheckInConfig + // using the information available to the session. + + // call the check-in API + checkInConfig := &model.OOAPICheckInConfig{ + Charging: false, + OnWiFi: false, + Platform: platform.Name(), + ProbeASN: location.ProbeASNString, + ProbeCC: location.ProbeCC, + RunType: model.RunTypeTimed, + SoftwareName: "miniooni", + SoftwareVersion: "0.1.0-dev", + WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ + CategoryCodes: []string{}, + }, + } + checkInTask := sess.CheckIn(ctx, checkInConfig) + awaitTask(checkInTask) + checkInResult := runtimex.Try1(checkInTask.Result()) + log.Infof("%+v", checkInResult) + + // measure and submit all the URLs + runtimex.Assert(checkInResult.Tests.WebConnectivity != nil, "nil WebConnectivity") + for _, entry := range checkInResult.Tests.WebConnectivity.URLs { + // TODO(bassosimone): the only problem with this style of measurement + // is that we create a new report ID for each measurement + // + // There are two options here: + // + // 1. we create a new Experiment explicitly + // + // 2. we change the way in which we determine whether to open a new report + // + // I think the first option is way better than the second. + + // perform the measurement + measurementTask := sess.Measure( + ctx, + "web_connectivity@v0.5", + entry.URL, + make(map[string]any), + ) + awaitTask(measurementTask) + measurementResult := runtimex.Try1(measurementTask.Result()) + log.Infof("%+v", measurementResult) + + // submit the measurement + submitTask := sess.Submit(ctx, measurementResult.Measurement) + awaitTask(submitTask) + reportID := runtimex.Try1(submitTask.Result()) + log.Infof("%+v", reportID) + } +} diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index 5a850693ae..30ced3783a 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -193,7 +193,7 @@ func (e *experiment) SubmitAndUpdateMeasurementContext( if e.report == nil { return errors.New("report is not open") } - return e.report.SubmitMeasurement(ctx, measurement) + return e.report.SubmitMeasurement(ctx, measurement, true) } // newMeasurement creates a new measurement for this experiment with the given input. diff --git a/internal/engine/session.go b/internal/engine/session.go index fb988b31b0..47c99d3c83 100644 --- a/internal/engine/session.go +++ b/internal/engine/session.go @@ -442,7 +442,7 @@ func (s *Session) NewProbeServicesClient(ctx context.Context) (*probeservices.Cl } // NewSubmitter creates a new submitter instance. -func (s *Session) NewSubmitter(ctx context.Context) (Submitter, error) { +func (s *Session) NewSubmitter(ctx context.Context) (*probeservices.Submitter, error) { psc, err := s.NewProbeServicesClient(ctx) if err != nil { return nil, err diff --git a/internal/miniengine/callbacks.go b/internal/miniengine/callbacks.go new file mode 100644 index 0000000000..f48a7eb14e --- /dev/null +++ b/internal/miniengine/callbacks.go @@ -0,0 +1,27 @@ +package miniengine + +// +// Measurement callbacks +// + +import "github.com/ooni/probe-cli/v3/internal/model" + +// callbacks implements [model.ExperimentCallbacks] +type callbacks struct { + emitter chan<- *Event +} + +var _ model.ExperimentCallbacks = &callbacks{} + +// OnProgress implements model.ExperimentCallbacks +func (c *callbacks) OnProgress(progress float64, message string) { + event := &Event{ + EventType: EventTypeProgress, + Message: message, + Progress: progress, + } + select { + case c.emitter <- event: + default: + } +} diff --git a/internal/miniengine/doc.go b/internal/miniengine/doc.go new file mode 100644 index 0000000000..d7bea4b205 --- /dev/null +++ b/internal/miniengine/doc.go @@ -0,0 +1 @@ +package miniengine diff --git a/internal/miniengine/event.go b/internal/miniengine/event.go new file mode 100644 index 0000000000..6b4f755ee9 --- /dev/null +++ b/internal/miniengine/event.go @@ -0,0 +1,29 @@ +package miniengine + +// +// Log and progress events. +// + +// EventTypeDebug is an [Event] containing a DEBUG message. +const EventTypeDebug = "DEBUG" + +// EventTypeInfo is an [Event] containing an INFO message. +const EventTypeInfo = "INFO" + +// EventTypeProgress is an [Event] containing a PROGRESS message. +const EventTypeProgress = "PROGRESS" + +// EventTypeWarning is an [Event] containing a WARNING message. +const EventTypeWarning = "WARNING" + +// Event is an interim event emitted by this implementation. +type Event struct { + // EventType is one of "DEBUG", "INFO", "PROGRESS", and "WARNING". + EventType string + + // Message is the string message. + Message string + + // Progress is the progress as a number between zero and one. + Progress float64 +} diff --git a/internal/miniengine/location.go b/internal/miniengine/location.go new file mode 100644 index 0000000000..774541ccfb --- /dev/null +++ b/internal/miniengine/location.go @@ -0,0 +1,35 @@ +package miniengine + +// +// Probe location +// + +// Location is the probe location. +type Location struct { + // ProbeASN is the probe AS number. + ProbeASN int64 `json:"probe_asn"` + + // ProbeASNString is the probe AS number as a string. + ProbeASNString string `json:"probe_asn_string"` + + // ProbeCC is the probe country code. + ProbeCC string `json:"probe_cc"` + + // ProbeNetworkName is the probe network name. + ProbeNetworkName string `json:"probe_network_name"` + + // IP is the probe IP. + ProbeIP string `json:"probe_ip"` + + // ResolverASN is the resolver ASN. + ResolverASN int64 `json:"resolver_asn"` + + // ResolverASNString is the resolver AS number as a string. + ResolverASNString string `json:"resolver_asn_string"` + + // ResolverIP is the resolver IP. + ResolverIP string `json:"resolver_ip"` + + // ResolverNetworkName is the resolver network name. + ResolverNetworkName string `json:"resolver_network_name"` +} diff --git a/internal/miniengine/logger.go b/internal/miniengine/logger.go new file mode 100644 index 0000000000..151cdc214e --- /dev/null +++ b/internal/miniengine/logger.go @@ -0,0 +1,78 @@ +package miniengine + +// +// Emitting log messages as events. +// + +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// loggerEmitter is a [model.Logger] using an [eventEmitter]. +type loggerEmitter struct { + // emitter is the channel where to emit events. + emitter chan<- *Event + + // isVerbose indicates whether to emit debug logs. + isVerbose bool +} + +// ensure that taskLogger implements model.Logger. +var _ model.Logger = &loggerEmitter{} + +// newLoggerEmitter creates a new [loggerEmitter] instance. +func newLoggerEmitter(emitter chan<- *Event, isVerbose bool) *loggerEmitter { + return &loggerEmitter{ + emitter: emitter, + isVerbose: isVerbose, + } +} + +// Debug implements model.Logger.Debug. +func (cl *loggerEmitter) Debug(msg string) { + if cl.isVerbose { + cl.emit(EventTypeDebug, msg) + } +} + +// Debugf implements model.Logger.Debugf. +func (cl *loggerEmitter) Debugf(format string, v ...interface{}) { + if cl.isVerbose { + cl.Debug(fmt.Sprintf(format, v...)) + } +} + +// Info implements model.Logger.Info. +func (cl *loggerEmitter) Info(msg string) { + cl.emit(EventTypeInfo, msg) +} + +// Infof implements model.Logger.Infof. +func (cl *loggerEmitter) Infof(format string, v ...interface{}) { + cl.Info(fmt.Sprintf(format, v...)) +} + +// Warn implements model.Logger.Warn. +func (cl *loggerEmitter) Warn(msg string) { + cl.emit(EventTypeWarning, msg) +} + +// Warnf implements model.Logger.Warnf. +func (cl *loggerEmitter) Warnf(format string, v ...interface{}) { + cl.Warn(fmt.Sprintf(format, v...)) +} + +// emit is the code that actually emits the log event. +func (cl *loggerEmitter) emit(level string, message string) { + event := &Event{ + EventType: level, + Message: message, + Progress: 0, + } + select { + case cl.emitter <- event: + default: + } +} diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go new file mode 100644 index 0000000000..2f70190096 --- /dev/null +++ b/internal/miniengine/session.go @@ -0,0 +1,495 @@ +package miniengine + +// +// Measurement session +// + +import ( + "errors" + "fmt" + "net/url" + "os" + "sync" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/probeservices" + "github.com/ooni/probe-cli/v3/internal/runtimex" + "golang.org/x/net/context" +) + +// SessionConfig contains configuration for a [Session]. The zero value is +// invalid; please, initialize all the fields marked as MANDATORY. +type SessionConfig struct { + // BackendURL allows you to OPTIONALLY force the + // usage of a specific OONI backend instance. + BackendURL string `json:"backend_url"` + + // Proxy allows you to OPTIONALLY force a specific proxy + // rather than using no proxy (the default). + // + // Use `psiphon:///` to force using Psiphon with the + // embedded configuration file. Not all builds have + // an embedded configuration file, but OONI builds have + // such a file, so they can use this functionality. + // + // Use `tor:///` and `torsf:///` to respectively use vanilla tor + // and tor plus snowflake as tunnels. + // + // Use `socks5://127.0.0.1:9050/` to connect to a SOCKS5 + // proxy running on 127.0.0.1:9050. This could be, for example, + // a suitably configured `tor` instance. + ProxyURL string `json:"proxy_url"` + + // SnowflakeRendezvousMethod OPTIONALLY allows you to specify + // which snowflake rendezvous method to use. Valid methods to use + // here are "amp" and "domain_fronting". + SnowflakeRendezvousMethod string `json:"snowflake_rendezvous_method"` + + // SoftwareName is the MANDATORY name of the application + // that will be using the new [Session]. + SoftwareName string `json:"software_name"` + + // SoftwareVersion is the MANDATORY version of the application + // that will be using the new [Session]. + SoftwareVersion string `json:"software_version"` + + // StateDir is the MANDATORY directory where to store state + // information required by a [Session]. + StateDir string `json:"state_dir"` + + // TempDir is the MANDATORY directory inside which the [Session] shall + // store temporary files deleted when we close the [Session]. + TempDir string `json:"temp_dir"` + + // TorArgs contains OPTIONAL arguments to pass to the "tor" binary + // when ProxyURL is `tor:///` or `torsf:///`. + TorArgs []string `json:"tor_args"` + + // TorBinary is the OPTIONAL "tor" binary to use. When using this code + // on mobile devices, we link with tor directly, so there is no need to + // specify this argument when running on a mobile device. + TorBinary string `json:"tor_binary"` + + // TunnelDir is the MANDATORY directory where the [Session] shall store + // persistent data regarding circumvention tunnels. + TunnelDir string `json:"tunnel_dir"` + + // Verbose OPTIONALLY configures the [Session] logger to be verbose. + Verbose bool `json:"verbose"` +} + +// ErrSessionConfig indicates that the [SessionConfig] is invalid. +var ErrSessionConfig = errors.New("invalid SessionConfig") + +// check checks whether the [SessionConfig] is valid. +func (sc *SessionConfig) check() error { + if sc.SoftwareName == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "SoftwareName is empty") + } + if sc.SoftwareVersion == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "SoftwareVersion is empty") + } + if sc.StateDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "StateDir is empty") + } + if sc.TempDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "TempDir is empty") + } + if sc.TunnelDir == "" { + return fmt.Errorf("%w: %s", ErrSessionConfig, "TunnelDir is empty") + } + return nil +} + +// mkdirAll ensures all the required directories exist. +func (sc *SessionConfig) mkdirAll() error { + if err := os.MkdirAll(sc.StateDir, 0700); err != nil { + return err + } + if err := os.MkdirAll(sc.TempDir, 0700); err != nil { + return err + } + if err := os.MkdirAll(sc.TunnelDir, 0700); err != nil { + return err + } + return nil +} + +// Session is a measurement session. The zero value is invalid; please +// create a new instance using the [NewSession] factory. +type Session struct { + // child is a thread-safe wrapper for the underlying [engine.Session]. + child *engine.Session + + // config is the [engine.SessionConfig] to use. + config *engine.SessionConfig + + // closeJustOnce ensures we close this [Session] just once. + closeJustOnce sync.Once + + // emitter is the [emitter] to use. + emitter chan *Event + + // logger is the [model.Logger] to use. + logger model.Logger + + // mu provides mutual exclusion. + mu sync.Mutex + + // psc is the [probeservices.Client] to use. + psc *probeservices.Client + + // submitter is the [probeservices.Submitter] to use. + submitter *probeservices.Submitter +} + +// NewSession creates a new [Session] instance. +func NewSession(config *SessionConfig) (*Session, error) { + // check whether the [SessionConfig] is valid. + if err := config.check(); err != nil { + return nil, err + } + + // make sure all the required directories exist. + if err := config.mkdirAll(); err != nil { + return nil, err + } + + // create keyvalue store inside the user provided StateDir. + kvstore, err := kvstore.NewFS(config.StateDir) + if err != nil { + return nil, err + } + + // honor user-provided backend service, if any. + var availableps []model.OOAPIService + if config.BackendURL != "" { + availableps = append(availableps, model.OOAPIService{ + Address: config.BackendURL, + Type: "https", + }) + } + + // honor user-provided proxy, if any. + var proxyURL *url.URL + if config.ProxyURL != "" { + var err error + proxyURL, err = url.Parse(config.ProxyURL) + if err != nil { + return nil, err + } + } + + // create the base event emitter + const buffer = 1024 + emitter := make(chan *Event, buffer) + + // create a logger using the base event emitter + logger := newLoggerEmitter(emitter, config.Verbose) + + // create the underlying session using the [engine] package. + engineConfig := engine.SessionConfig{ + AvailableProbeServices: availableps, + KVStore: kvstore, + Logger: logger, + ProxyURL: proxyURL, + SoftwareName: config.SoftwareName, + SoftwareVersion: config.SoftwareVersion, + TempDir: config.TempDir, + TorArgs: config.TorArgs, + TorBinary: config.TorBinary, + SnowflakeRendezvous: config.SnowflakeRendezvousMethod, + TunnelDir: config.TunnelDir, + } + + // assemble and return a session. + sess := &Session{ + child: nil, + config: &engineConfig, + closeJustOnce: sync.Once{}, + emitter: emitter, + logger: logger, + mu: sync.Mutex{}, + } + return sess, nil +} + +// Bootstrap ensures that we bootstrap the [Session]. +func (sess *Session) Bootstrap(ctx context.Context) *Task[Void] { + task := &Task[Void]{ + done: make(chan any), + events: sess.emitter, + failure: nil, + result: Void{}, + } + + go func() { + // synchronize with Task.Result + defer close(task.done) + + // create a new underlying session instance + child, err := engine.NewSession(ctx, *sess.config) + if err != nil { + task.failure = err + return + } + + // create a probeservices client + psc, err := child.NewProbeServicesClient(ctx) + if err != nil { + child.Close() + task.failure = err + return + } + + // create a submitter + submitter, err := child.NewSubmitter(ctx) + if err != nil { + child.Close() + task.failure = err + return + } + + // lock and store the underlying fields + defer sess.mu.Unlock() + sess.mu.Lock() + sess.child = child + sess.psc = psc + sess.submitter = submitter + }() + + return task +} + +// ErrNoBootstrap indicates that you did not bootstrap the [Session]. +var ErrNoBootstrap = errors.New("bootstrap the session first") + +// CheckIn invokes the backend check-in API using the [Session]. +func (sess *Session) CheckIn( + ctx context.Context, config *model.OOAPICheckInConfig) *Task[*model.OOAPICheckInResult] { + task := &Task[*model.OOAPICheckInResult]{ + done: make(chan any), + events: sess.emitter, + failure: nil, + result: nil, + } + + go func() { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + sess.mu.Lock() + defer sess.mu.Unlock() + + // handle the case where we did not bootstrap + if sess.child == nil { + task.failure = ErrNoBootstrap + return + } + runtimex.Assert(sess.psc != nil, "sess.psc is nil") + + // invoke the backend check-in API + result, err := sess.psc.CheckIn(ctx, *config) + if err != nil { + task.failure = err + return + } + + // pass result to the caller + task.result = result + }() + + return task +} + +// Geolocate uses the [Session] to geolocate the probe. +func (sess *Session) Geolocate(ctx context.Context) *Task[*Location] { + task := &Task[*Location]{ + done: make(chan any), + events: sess.emitter, + failure: nil, + result: nil, + } + + go func() { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + sess.mu.Lock() + defer sess.mu.Unlock() + + // handle the case where we did not bootstrap + if sess.child == nil { + task.failure = ErrNoBootstrap + return + } + + // perform geolocation and handle failure + if err := sess.child.MaybeLookupLocationContext(ctx); err != nil { + task.failure = err + return + } + + // copy results to the caller + task.result = &Location{ + ProbeASN: int64(sess.child.ProbeASN()), + ProbeASNString: sess.child.ProbeASNString(), + ProbeCC: sess.child.ProbeCC(), + ProbeNetworkName: sess.child.ProbeNetworkName(), + ProbeIP: sess.child.ProbeIP(), + ResolverASN: int64(sess.child.ResolverASN()), + ResolverASNString: sess.child.ResolverASNString(), + ResolverIP: sess.child.ResolverIP(), + ResolverNetworkName: sess.child.ResolverNetworkName(), + } + }() + + return task +} + +// MeasurementResult contains the results of [Session.Measure] +type MeasurementResult struct { + // KibiBytesReceived contains the KiB we received + KibiBytesReceived float64 + + // KibiBytesSent contains the KiB we sent + KibiBytesSent float64 + + // Measurement is the generated [model.Measurement] + Measurement *model.Measurement `json:"measurement"` + + // Summary is the corresponding summary. + Summary any `json:"summary"` +} + +// Measure performs a measurement using the given experiment name, the +// given input, and the given opaque experiment options. +func (sess *Session) Measure( + ctx context.Context, + name string, + input string, + options map[string]any, +) *Task[*MeasurementResult] { + task := &Task[*MeasurementResult]{ + done: make(chan any), + events: sess.emitter, + failure: nil, + result: nil, + } + + go func() { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + sess.mu.Lock() + defer sess.mu.Unlock() + + // handle the case where we did not bootstrap + if sess.child == nil { + task.failure = ErrNoBootstrap + return + } + + // TODO(bassosimone): there is a bug where we create a new report ID for + // each measurement because there's a different TestStartTime + + // create a [model.ExperimentBuilder] + builder, err := sess.child.NewExperimentBuilder(name) + if err != nil { + task.failure = err + return + } + + // set the proper callbacks for the experiment + callbacks := &callbacks{sess.emitter} + builder.SetCallbacks(callbacks) + + // set the proper options for the experiment + builder.SetOptionsAny(options) + + // create an experiment instance + exp := builder.NewExperiment() + + // perform the measurement + meas, err := exp.MeasureWithContext(ctx, input) + if err != nil { + task.failure = err + return + } + + // obtain the summary + summary, err := exp.GetSummaryKeys(meas) + if err != nil { + task.failure = err + return + } + + // pass response to the caller + task.result = &MeasurementResult{ + KibiBytesReceived: exp.KibiBytesReceived(), + KibiBytesSent: exp.KibiBytesSent(), + Measurement: meas, + Summary: summary, + } + + }() + + return task +} + +// Submit submits a [model.Measurement] to the OONI backend. +func (sess *Session) Submit(ctx context.Context, meas *model.Measurement) *Task[string] { + task := &Task[string]{ + done: make(chan any), + events: sess.emitter, + failure: nil, + result: "", + } + + go func() { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + sess.mu.Lock() + defer sess.mu.Unlock() + + // handle the case where we did not bootstrap + if sess.child == nil { + task.failure = ErrNoBootstrap + return + } + runtimex.Assert(sess.submitter != nil, "sess.psc is nil") + + // submit without causing data races + reportID, err := sess.submitter.SubmitWithoutModifyingMeasurement(ctx, meas) + if err != nil { + task.failure = err + return + } + + // pass the reportID to the caller + task.result = reportID + }() + + return task +} + +// Close closes the [Session]. This function does not attempt +// to close an already closed [Session]. +func (sess *Session) Close() (err error) { + sess.closeJustOnce.Do(func() { + sess.mu.Lock() + if sess.child != nil { + err = sess.child.Close() + sess.child = nil + } + sess.mu.Unlock() + }) + return err +} diff --git a/internal/miniengine/task.go b/internal/miniengine/task.go new file mode 100644 index 0000000000..d151fdcc1b --- /dev/null +++ b/internal/miniengine/task.go @@ -0,0 +1,48 @@ +package miniengine + +// +// Task +// + +import "golang.org/x/net/context" + +// Task is a long running operation that emits [Event] while it is running and +// produces a given Result. The zero value of this struct is invalid; you cannot +// create a valid [Task] outside of this package. +type Task[Result any] struct { + // done is closed when the [Task] is done. + done chan any + + // events is where the [Task] emits [Event]. + events chan *Event + + // failure is the [Task] failure or nil. + failure error + + // result is the [Task] result (zero on failure). + result Result +} + +// TaskRunner runs the main function that produces a [Task] result. +type TaskRunner[Result any] interface { + // Main is the [Task] main function. + Main(ctx context.Context) (Result, error) +} + +// Done returns a channel closed when the [Task] is done. +func (t *Task[Result]) Done() <-chan any { + return t.done +} + +// Events returns a channel where a running [Task] emits [Event]. +func (t *Task[Result]) Events() <-chan *Event { + return t.events +} + +// Result returns the [Task] result (if the task succeded) or the error that +// occurred (in case of failure). This method blocks until the channel returned +// by the [Task.Done] method has been closed. +func (t *Task[Result]) Result() (Result, error) { + <-t.done // synchronize with TaskRunner.Main + return t.result, t.failure +} diff --git a/internal/miniengine/void.go b/internal/miniengine/void.go new file mode 100644 index 0000000000..f598d2dde0 --- /dev/null +++ b/internal/miniengine/void.go @@ -0,0 +1,4 @@ +package miniengine + +// Void is a structure without any content. +type Void struct{} diff --git a/internal/probeservices/collector.go b/internal/probeservices/collector.go index 54818f2749..34eea6c3c5 100644 --- a/internal/probeservices/collector.go +++ b/internal/probeservices/collector.go @@ -81,9 +81,15 @@ func (r reportChan) CanSubmit(m *model.Measurement) bool { // such that it contains the report ID for which it has been // submitted. Otherwise, we'll set the report ID to the empty // string, so that you know which measurements weren't submitted. -func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { +func (r reportChan) SubmitMeasurement( + ctx context.Context, + m *model.Measurement, + edit bool, +) error { var updateResponse model.OOAPICollectorUpdateResponse - m.ReportID = r.ID + if edit { + m.ReportID = r.ID + } err := r.client.APIClientTemplate.WithBodyLogging().Build().PostJSON( ctx, fmt.Sprintf("/report/%s", r.ID), model.OOAPICollectorUpdateRequest{ Format: "json", @@ -91,7 +97,9 @@ func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) }, &updateResponse, ) if err != nil { - m.ReportID = "" + if edit { + m.ReportID = "" + } return err } return nil @@ -107,7 +115,7 @@ func (r reportChan) ReportID() string { type ReportChannel interface { CanSubmit(m *model.Measurement) bool ReportID() string - SubmitMeasurement(ctx context.Context, m *model.Measurement) error + SubmitMeasurement(ctx context.Context, m *model.Measurement, edit bool) error } var _ ReportChannel = &reportChan{} @@ -140,15 +148,32 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter { // Submit submits the current measurement to the OONI backend created using // the ReportOpener passed to the constructor. func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error { + _, err := sub.submit(ctx, m, true) + return err +} + +// SubmitWithoutModifyingMeasurement is like Submit but does not modify +// the measurement in place (which may cause a data race). +func (sub *Submitter) SubmitWithoutModifyingMeasurement( + ctx context.Context, + m *model.Measurement, +) (string, error) { + return sub.submit(ctx, m, false) +} + +func (sub *Submitter) submit(ctx context.Context, m *model.Measurement, edit bool) (string, error) { var err error sub.mu.Lock() defer sub.mu.Unlock() if sub.channel == nil || !sub.channel.CanSubmit(m) { sub.channel, err = sub.opener.OpenReport(ctx, NewReportTemplate(m)) if err != nil { - return err + return "", err } sub.logger.Infof("New reportID: %s", sub.channel.ReportID()) } - return sub.channel.SubmitMeasurement(ctx, m) + if err := sub.channel.SubmitMeasurement(ctx, m, edit); err != nil { + return "", err + } + return sub.channel.ReportID(), nil } diff --git a/pkg/oonimkall/tasklogger.go b/pkg/oonimkall/tasklogger.go index b6550dd3d8..7793b2a76f 100644 --- a/pkg/oonimkall/tasklogger.go +++ b/pkg/oonimkall/tasklogger.go @@ -1,17 +1,17 @@ package oonimkall -import ( - "fmt" - - "github.com/ooni/probe-cli/v3/internal/model" -) - // // This file implements the logger used by a task. Outside // of this file, the rest of the codebase just sees a generic // model.Logger that can log events. // +import ( + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" +) + // taskLogger is the logger used by a task. type taskLogger struct { // emitter is the event emitter. From 8de36b6a437fadf811cebb68bf1989ed1501e58f Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 29 Mar 2023 16:18:19 +0000 Subject: [PATCH 02/11] fix: make code more ergonomic to use --- internal/cmd/exp/main.go | 10 +++------- internal/miniengine/session.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/internal/cmd/exp/main.go b/internal/cmd/exp/main.go index c253d00e6b..f707396edc 100644 --- a/internal/cmd/exp/main.go +++ b/internal/cmd/exp/main.go @@ -7,7 +7,6 @@ import ( "github.com/apex/log" "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" - "github.com/ooni/probe-cli/v3/internal/platform" "github.com/ooni/probe-cli/v3/internal/runtimex" ) @@ -70,19 +69,16 @@ func main() { location := runtimex.Try1(locationTask.Result()) log.Infof("%+v", location) - // TODO(bassosimone): we want a factory to construct a new OOAPICheckInConfig - // using the information available to the session. - // call the check-in API checkInConfig := &model.OOAPICheckInConfig{ Charging: false, OnWiFi: false, - Platform: platform.Name(), + Platform: sess.Platform(), ProbeASN: location.ProbeASNString, ProbeCC: location.ProbeCC, RunType: model.RunTypeTimed, - SoftwareName: "miniooni", - SoftwareVersion: "0.1.0-dev", + SoftwareName: sess.SoftwareName(), + SoftwareVersion: sess.SoftwareVersion(), WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ CategoryCodes: []string{}, }, diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go index 2f70190096..9bc1356e66 100644 --- a/internal/miniengine/session.go +++ b/internal/miniengine/session.go @@ -14,6 +14,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/platform" "github.com/ooni/probe-cli/v3/internal/probeservices" "github.com/ooni/probe-cli/v3/internal/runtimex" "golang.org/x/net/context" @@ -216,6 +217,21 @@ func NewSession(config *SessionConfig) (*Session, error) { return sess, nil } +// Platform returns the operating system platform name. +func (sess *Session) Platform() string { + return platform.Name() +} + +// SoftwareName returns the configured software name. +func (sess *Session) SoftwareName() string { + return sess.config.SoftwareName +} + +// SoftwareVersion returns the configured software version. +func (sess *Session) SoftwareVersion() string { + return sess.config.SoftwareVersion +} + // Bootstrap ensures that we bootstrap the [Session]. func (sess *Session) Bootstrap(ctx context.Context) *Task[Void] { task := &Task[Void]{ From a765167a27ad39f707061cecf39707a3ad81413e Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 29 Mar 2023 16:32:18 +0000 Subject: [PATCH 03/11] fix: avoid creating a new report ID for each measurement --- internal/cmd/exp/main.go | 28 +++++------- internal/miniengine/session.go | 79 ++++++++++++++++++++++------------ 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/internal/cmd/exp/main.go b/internal/cmd/exp/main.go index f707396edc..1d81aef923 100644 --- a/internal/cmd/exp/main.go +++ b/internal/cmd/exp/main.go @@ -1,5 +1,9 @@ package main +// +// Demonstrates using the fundamental OONI Engine API +// + import ( "context" "path/filepath" @@ -10,11 +14,14 @@ import ( "github.com/ooni/probe-cli/v3/internal/runtimex" ) +// awaitableTask is a [miniengine.Task] that we can await and for which +// we can obtain the interim events while it's running. type awaitableTask interface { Done() <-chan any Events() <-chan *miniengine.Event } +// awaitTask awaits for the task to be done and emits interim events func awaitTask(task awaitableTask) { for { select { @@ -88,27 +95,14 @@ func main() { checkInResult := runtimex.Try1(checkInTask.Result()) log.Infof("%+v", checkInResult) + // create an instance of the Web Connectivity experiment + exp := runtimex.Try1(sess.NewExperiment("web_connectivity", make(map[string]any))) + // measure and submit all the URLs runtimex.Assert(checkInResult.Tests.WebConnectivity != nil, "nil WebConnectivity") for _, entry := range checkInResult.Tests.WebConnectivity.URLs { - // TODO(bassosimone): the only problem with this style of measurement - // is that we create a new report ID for each measurement - // - // There are two options here: - // - // 1. we create a new Experiment explicitly - // - // 2. we change the way in which we determine whether to open a new report - // - // I think the first option is way better than the second. - // perform the measurement - measurementTask := sess.Measure( - ctx, - "web_connectivity@v0.5", - entry.URL, - make(map[string]any), - ) + measurementTask := sess.Measure(ctx, exp, entry.URL) awaitTask(measurementTask) measurementResult := runtimex.Try1(measurementTask.Result()) log.Infof("%+v", measurementResult) diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go index 9bc1356e66..529b48e703 100644 --- a/internal/miniengine/session.go +++ b/internal/miniengine/session.go @@ -382,14 +382,58 @@ type MeasurementResult struct { Summary any `json:"summary"` } -// Measure performs a measurement using the given experiment name, the -// given input, and the given opaque experiment options. +// Experiment is this package's view of a [model.Experiment] +type Experiment interface { + // GetSummaryKeys returns a data structure containing a + // summary of the test keys for ooniprobe. + GetSummaryKeys(m *model.Measurement) (any, error) + + // KibiBytesReceived accounts for the KibiBytes received by the experiment. + KibiBytesReceived() float64 + + // KibiBytesSent is like KibiBytesReceived but for the bytes sent. + KibiBytesSent() float64 + + // MeasureWithContext performs a synchronous measurement. + // + // Return value: strictly either a non-nil measurement and + // a nil error or a nil measurement and a non-nil error. + MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) +} + +// NewExperiment creates a new [Experiment] instance, +func (sess *Session) NewExperiment(name string, options map[string]any) (Experiment, error) { + // lock and access the underlying session + sess.mu.Lock() + defer sess.mu.Unlock() + + // handle the case where we did not bootstrap + if sess.child == nil { + return nil, ErrNoBootstrap + } + + // create a [model.ExperimentBuilder] + builder, err := sess.child.NewExperimentBuilder(name) + if err != nil { + return nil, err + } + + // set the proper callbacks for the experiment + callbacks := &callbacks{sess.emitter} + builder.SetCallbacks(callbacks) + + // set the proper options for the experiment + if err := builder.SetOptionsAny(options); err != nil { + return nil, err + } + + // create an experiment instance + return builder.NewExperiment(), nil +} + +// Measure performs a measurement using the given experiment and input. func (sess *Session) Measure( - ctx context.Context, - name string, - input string, - options map[string]any, -) *Task[*MeasurementResult] { + ctx context.Context, exp Experiment, input string) *Task[*MeasurementResult] { task := &Task[*MeasurementResult]{ done: make(chan any), events: sess.emitter, @@ -411,26 +455,6 @@ func (sess *Session) Measure( return } - // TODO(bassosimone): there is a bug where we create a new report ID for - // each measurement because there's a different TestStartTime - - // create a [model.ExperimentBuilder] - builder, err := sess.child.NewExperimentBuilder(name) - if err != nil { - task.failure = err - return - } - - // set the proper callbacks for the experiment - callbacks := &callbacks{sess.emitter} - builder.SetCallbacks(callbacks) - - // set the proper options for the experiment - builder.SetOptionsAny(options) - - // create an experiment instance - exp := builder.NewExperiment() - // perform the measurement meas, err := exp.MeasureWithContext(ctx, input) if err != nil { @@ -452,7 +476,6 @@ func (sess *Session) Measure( Measurement: meas, Summary: summary, } - }() return task From 19d06a7ec99e48e7a4b601219e87a51d1073b217 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 30 Mar 2023 14:21:13 +0000 Subject: [PATCH 04/11] refactor: simplify operations taking advantage of check-in API --- internal/cmd/exp/main.go | 79 ++--- internal/miniengine/bootstrap.go | 258 ++++++++++++++++ internal/miniengine/measure.go | 107 +++++++ internal/miniengine/optional.go | 37 +++ internal/miniengine/session.go | 459 +++++----------------------- internal/miniengine/submit.go | 44 +++ internal/probeservices/collector.go | 19 ++ 7 files changed, 579 insertions(+), 424 deletions(-) create mode 100644 internal/miniengine/bootstrap.go create mode 100644 internal/miniengine/measure.go create mode 100644 internal/miniengine/optional.go create mode 100644 internal/miniengine/submit.go diff --git a/internal/cmd/exp/main.go b/internal/cmd/exp/main.go index 1d81aef923..70e9820bb2 100644 --- a/internal/cmd/exp/main.go +++ b/internal/cmd/exp/main.go @@ -48,69 +48,70 @@ func main() { // create session config sessionConfig := &miniengine.SessionConfig{ + SoftwareName: "miniooni", + SoftwareVersion: "0.1.0-dev", + StateDir: filepath.Join("x", "state"), + TempDir: filepath.Join("x", "tmp"), + TunnelDir: filepath.Join("x", "tunnel"), + Verbose: false, + } + + // create session + sess := runtimex.Try1(miniengine.NewSession(sessionConfig)) + defer sess.Close() + + // create the bootstrap config + bootstrapConfig := &miniengine.BootstrapConfig{ BackendURL: "", + CategoryCodes: []string{}, + Charging: false, + OnWiFi: false, ProxyURL: "", + RunType: model.RunTypeTimed, SnowflakeRendezvousMethod: "", - SoftwareName: "miniooni", - SoftwareVersion: "0.1.0-dev", - StateDir: filepath.Join("x", "state"), - TempDir: filepath.Join("x", "tmp"), TorArgs: []string{}, TorBinary: "", - TunnelDir: filepath.Join("x", "tunnel"), - Verbose: false, } - // create session - sess := runtimex.Try1(miniengine.NewSession(sessionConfig)) - defer sess.Close() - // bootstrap the session - bootstrapTask := sess.Bootstrap(ctx) + bootstrapTask := sess.Bootstrap(ctx, bootstrapConfig) awaitTask(bootstrapTask) _ = runtimex.Try1(bootstrapTask.Result()) - // geolocate the probe - locationTask := sess.Geolocate(ctx) - awaitTask(locationTask) - location := runtimex.Try1(locationTask.Result()) + // obtain the probe geolocation + location := runtimex.Try1(sess.GeolocateResult()) log.Infof("%+v", location) - // call the check-in API - checkInConfig := &model.OOAPICheckInConfig{ - Charging: false, - OnWiFi: false, - Platform: sess.Platform(), - ProbeASN: location.ProbeASNString, - ProbeCC: location.ProbeCC, - RunType: model.RunTypeTimed, - SoftwareName: sess.SoftwareName(), - SoftwareVersion: sess.SoftwareVersion(), - WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ - CategoryCodes: []string{}, - }, - } - checkInTask := sess.CheckIn(ctx, checkInConfig) - awaitTask(checkInTask) - checkInResult := runtimex.Try1(checkInTask.Result()) + // obtain the check-in API response + checkInResult := runtimex.Try1(sess.CheckInResult()) log.Infof("%+v", checkInResult) - // create an instance of the Web Connectivity experiment - exp := runtimex.Try1(sess.NewExperiment("web_connectivity", make(map[string]any))) + // obtain check-in information for the web connectivity experiment + runtimex.Assert(checkInResult.Tests.WebConnectivity != nil, "nil WebConnectivity") + webConnectivity := checkInResult.Tests.WebConnectivity + + log.Infof("report ID: %s", webConnectivity.ReportID) // measure and submit all the URLs - runtimex.Assert(checkInResult.Tests.WebConnectivity != nil, "nil WebConnectivity") - for _, entry := range checkInResult.Tests.WebConnectivity.URLs { + for _, entry := range webConnectivity.URLs { // perform the measurement - measurementTask := sess.Measure(ctx, exp, entry.URL) + options := make(map[string]any) + measurementTask := sess.Measure(ctx, "web_connectivity", options, entry.URL) awaitTask(measurementTask) measurementResult := runtimex.Try1(measurementTask.Result()) log.Infof("%+v", measurementResult) + // set the report ID + measurementResult.Measurement.ReportID = webConnectivity.ReportID + // submit the measurement submitTask := sess.Submit(ctx, measurementResult.Measurement) awaitTask(submitTask) - reportID := runtimex.Try1(submitTask.Result()) - log.Infof("%+v", reportID) + _ = runtimex.Try1(submitTask.Result()) + log.Infof( + "https://explorer.ooni.org/measurement/%s?input=%s", + webConnectivity.ReportID, + entry.URL, + ) } } diff --git a/internal/miniengine/bootstrap.go b/internal/miniengine/bootstrap.go new file mode 100644 index 0000000000..3ac560d889 --- /dev/null +++ b/internal/miniengine/bootstrap.go @@ -0,0 +1,258 @@ +package miniengine + +// +// Code to bootstrap a measurement session +// + +import ( + "net/url" + + "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/platform" + "github.com/ooni/probe-cli/v3/internal/probeservices" + "golang.org/x/net/context" +) + +// BootstrapConfig contains the config for [Session.Bootstrap]. The zero value +// is invalid; please fill all fields marked as MANDATORY. +type BootstrapConfig struct { + // BackendURL allows you to OPTIONALLY force the + // usage of a specific OONI backend instance. + BackendURL string `json:"backend_url"` + + // CategoryCodes contains OPTIONAL category codes for the check-in API. + CategoryCodes []string `json:"category_codes"` + + // Charging is the OPTIONAL charging hint for the check-in API. + Charging bool `json:"charging"` + + // OnWiFi is the OPTIONAL on-wifi hint for the check-in API. + OnWiFi bool `json:"on_wifi"` + + // ProxyURL allows you to OPTIONALLY force a specific proxy + // rather than using no proxy (the default). + // + // Use `psiphon:///` to force using Psiphon with the + // embedded configuration file. Not all builds have + // an embedded configuration file, but OONI builds have + // such a file, so they can use this functionality. + // + // Use `tor:///` and `torsf:///` to respectively use vanilla tor + // and tor plus snowflake as tunnels. + // + // Use `socks5://127.0.0.1:9050/` to connect to a SOCKS5 + // proxy running on 127.0.0.1:9050. This could be, for example, + // a suitably configured `tor` instance. + ProxyURL string `json:"proxy_url"` + + // RunType is the MANDATORY run-type for the check-in API. + RunType model.RunType `json:"run_type"` + + // SnowflakeRendezvousMethod OPTIONALLY allows you to specify + // which snowflake rendezvous method to use. Valid methods to use + // here are "amp" and "domain_fronting". + SnowflakeRendezvousMethod string `json:"snowflake_rendezvous_method"` + + // TorArgs contains OPTIONAL arguments to pass to the "tor" binary + // when ProxyURL is `tor:///` or `torsf:///`. + TorArgs []string `json:"tor_args"` + + // TorBinary is the OPTIONAL "tor" binary to use. When using this code + // on mobile devices, we link with tor directly, so there is no need to + // specify this argument when running on a mobile device. + TorBinary string `json:"tor_binary"` +} + +// Bootstrap ensures that we bootstrap the [Session]. This function +// is safe to call multiple times. We'll only bootstrap on the first +// invocation and do nothing for subsequent invocations. +func (s *Session) Bootstrap(ctx context.Context, config *BootstrapConfig) *Task[Void] { + task := &Task[Void]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: Void{}, + } + go s.bootstrapAsync(ctx, config, task) + return task +} + +// bootstrapAsync runs the bootstrap in a background goroutine. +func (s *Session) bootstrapAsync(ctx context.Context, config *BootstrapConfig, task *Task[Void]) { + // synchronize with Task.Result + defer close(task.done) + + // make the whole operation locked with respect to s + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where bootstrap already occurred while we were locked + if !s.state.IsNone() { + return + } + + // perform a sync bootstrap + err := s.bootstrapSyncLocked(ctx, config) + + // pass result to the caller + task.failure = err +} + +// bootstrapSyncLocked executes a synchronous bootstrap. This function MUST be +// run while holding the s.mu mutex because it mutates s. +func (s *Session) bootstrapSyncLocked(ctx context.Context, config *BootstrapConfig) error { + // create a new instance of the [engineSessionState] type. + ess, err := s.newEngineSessionState(ctx, config) + if err != nil { + return err + } + + // MUTATE s to store the state + s.state = some(ess) + return nil +} + +// newEngineSessionState creates a new instance of [engineSessionState]. +func (s *Session) newEngineSessionState( + ctx context.Context, config *BootstrapConfig) (*engineSessionState, error) { + // create configuration for [engine.NewSession] + engineConfig, err := s.newEngineSessionConfig(config) + if err != nil { + return nil, err + } + + // create a new underlying session instance + child, err := engine.NewSession(ctx, *engineConfig) + if err != nil { + return nil, err + } + + // create a probeservices client + psc, err := child.NewProbeServicesClient(ctx) + if err != nil { + child.Close() + return nil, err + } + + // geolocate the probe. + location, err := s.geolocate(ctx, child) + if err != nil { + child.Close() + return nil, err + } + + // call the check-in API. + resp, err := s.checkIn(ctx, location, child, psc, config) + if err != nil { + child.Close() + return nil, err + } + + // create [engineSessionState] + ess := &engineSessionState{ + checkIn: resp, + geoloc: location, + psc: psc, + sess: child, + } + return ess, nil +} + +// geolocate performs the geolocation during the bootstrap. +func (s *Session) geolocate(ctx context.Context, sess *engine.Session) (*Location, error) { + // perform geolocation and handle failure + if err := sess.MaybeLookupLocationContext(ctx); err != nil { + return nil, err + } + + // copy the result of the geolocation + location := &Location{ + ProbeASN: int64(sess.ProbeASN()), + ProbeASNString: sess.ProbeASNString(), + ProbeCC: sess.ProbeCC(), + ProbeNetworkName: sess.ProbeNetworkName(), + ProbeIP: sess.ProbeIP(), + ResolverASN: int64(sess.ResolverASN()), + ResolverASNString: sess.ResolverASNString(), + ResolverIP: sess.ResolverIP(), + ResolverNetworkName: sess.ResolverNetworkName(), + } + return location, nil +} + +// checkIn invokes the checkIn API. +func (s *Session) checkIn( + ctx context.Context, + location *Location, + sess *engine.Session, + psc *probeservices.Client, + config *BootstrapConfig, +) (*model.OOAPICheckInResult, error) { + categoryCodes := config.CategoryCodes + if len(categoryCodes) <= 0 { + // make sure it not nil because this would + // actually break the check-in API + categoryCodes = []string{} + } + apiConfig := model.OOAPICheckInConfig{ + Charging: config.Charging, + OnWiFi: config.OnWiFi, + Platform: platform.Name(), + ProbeASN: location.ProbeASNString, + ProbeCC: location.ProbeCC, + RunType: config.RunType, + SoftwareName: sess.SoftwareName(), + SoftwareVersion: sess.SoftwareVersion(), + WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ + CategoryCodes: categoryCodes, + }, + } + return psc.CheckIn(ctx, apiConfig) +} + +// newEngineSessionConfig creates a new [engine.SessionConfig] instance. +func (s *Session) newEngineSessionConfig(config *BootstrapConfig) (*engine.SessionConfig, error) { + // create keyvalue store inside the user provided stateDir. + kvstore, err := kvstore.NewFS(s.stateDir) + if err != nil { + return nil, err + } + + // honor user-provided backend service, if any. + var availableps []model.OOAPIService + if config.BackendURL != "" { + availableps = append(availableps, model.OOAPIService{ + Address: config.BackendURL, + Type: "https", + }) + } + + // honor user-provided proxy, if any. + var proxyURL *url.URL + if config.ProxyURL != "" { + var err error + proxyURL, err = url.Parse(config.ProxyURL) + if err != nil { + return nil, err + } + } + + // create the underlying session using the [engine] package. + engineConfig := &engine.SessionConfig{ + AvailableProbeServices: availableps, + KVStore: kvstore, + Logger: s.logger, + ProxyURL: proxyURL, + SoftwareName: s.softwareName, + SoftwareVersion: s.softwareVersion, + TempDir: s.tempDir, + TorArgs: config.TorArgs, + TorBinary: config.TorBinary, + SnowflakeRendezvous: config.SnowflakeRendezvousMethod, + TunnelDir: s.tunnelDir, + } + + return engineConfig, nil +} diff --git a/internal/miniengine/measure.go b/internal/miniengine/measure.go new file mode 100644 index 0000000000..7d2181859a --- /dev/null +++ b/internal/miniengine/measure.go @@ -0,0 +1,107 @@ +package miniengine + +// +// Code to run measurements +// + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) + +// MeasurementResult contains the results of [Session.Measure] +type MeasurementResult struct { + // KibiBytesReceived contains the KiB we received + KibiBytesReceived float64 + + // KibiBytesSent contains the KiB we sent + KibiBytesSent float64 + + // Measurement is the generated [model.Measurement] + Measurement *model.Measurement `json:"measurement"` + + // Summary is the corresponding summary. + Summary any `json:"summary"` +} + +// Measure performs a measurement using the given experiment and input. +func (s *Session) Measure( + ctx context.Context, + name string, + options map[string]any, + input string, +) *Task[*MeasurementResult] { + task := &Task[*MeasurementResult]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: nil, + } + go s.measureAsync(ctx, name, options, input, task) + return task +} + +// measureAsync runs the measurement in a background goroutine. +func (s *Session) measureAsync( + ctx context.Context, + name string, + options map[string]any, + input string, + task *Task[*MeasurementResult], +) { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + s.mu.Lock() + defer s.mu.Unlock() + + // handle the case where we did not bootstrap + if s.state.IsNone() { + task.failure = ErrNoBootstrap + return + } + sess := s.state.Unwrap().sess + + // create a [model.ExperimentBuilder] + builder, err := sess.NewExperimentBuilder(name) + if err != nil { + task.failure = err + return + } + + // set the proper callbacks for the experiment + callbacks := &callbacks{s.emitter} + builder.SetCallbacks(callbacks) + + // set the proper options for the experiment + if err := builder.SetOptionsAny(options); err != nil { + task.failure = err + return + } + + // create an experiment instance + exp := builder.NewExperiment() + + // perform the measurement + meas, err := exp.MeasureWithContext(ctx, input) + if err != nil { + task.failure = err + return + } + + // obtain the summary + summary, err := exp.GetSummaryKeys(meas) + if err != nil { + task.failure = err + return + } + + // pass response to the caller + task.result = &MeasurementResult{ + KibiBytesReceived: exp.KibiBytesReceived(), + KibiBytesSent: exp.KibiBytesSent(), + Measurement: meas, + Summary: summary, + } +} diff --git a/internal/miniengine/optional.go b/internal/miniengine/optional.go new file mode 100644 index 0000000000..5560f4fb92 --- /dev/null +++ b/internal/miniengine/optional.go @@ -0,0 +1,37 @@ +package miniengine + +// +// Internally-used optional type +// + +import "github.com/ooni/probe-cli/v3/internal/runtimex" + +// optional is an optional container. +type optional[Type any] struct { + v *Type +} + +// some creates an initialized optional instance. +func some[Type any](v Type) optional[Type] { + return optional[Type]{ + v: &v, + } +} + +// none creates an empty optional instance. +func none[Type any]() optional[Type] { + return optional[Type]{ + v: nil, + } +} + +// IsNone returns whether the optional is empty. +func (o *optional[Type]) IsNone() bool { + return o.v == nil +} + +// Unwrap returns the optional value. +func (o *optional[Type]) Unwrap() Type { + runtimex.Assert(!o.IsNone(), "optional[Type] is none") + return *o.v +} diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go index 529b48e703..381279f6ed 100644 --- a/internal/miniengine/session.go +++ b/internal/miniengine/session.go @@ -7,47 +7,17 @@ package miniengine import ( "errors" "fmt" - "net/url" "os" "sync" "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/model" - "github.com/ooni/probe-cli/v3/internal/platform" "github.com/ooni/probe-cli/v3/internal/probeservices" - "github.com/ooni/probe-cli/v3/internal/runtimex" - "golang.org/x/net/context" ) // SessionConfig contains configuration for a [Session]. The zero value is // invalid; please, initialize all the fields marked as MANDATORY. type SessionConfig struct { - // BackendURL allows you to OPTIONALLY force the - // usage of a specific OONI backend instance. - BackendURL string `json:"backend_url"` - - // Proxy allows you to OPTIONALLY force a specific proxy - // rather than using no proxy (the default). - // - // Use `psiphon:///` to force using Psiphon with the - // embedded configuration file. Not all builds have - // an embedded configuration file, but OONI builds have - // such a file, so they can use this functionality. - // - // Use `tor:///` and `torsf:///` to respectively use vanilla tor - // and tor plus snowflake as tunnels. - // - // Use `socks5://127.0.0.1:9050/` to connect to a SOCKS5 - // proxy running on 127.0.0.1:9050. This could be, for example, - // a suitably configured `tor` instance. - ProxyURL string `json:"proxy_url"` - - // SnowflakeRendezvousMethod OPTIONALLY allows you to specify - // which snowflake rendezvous method to use. Valid methods to use - // here are "amp" and "domain_fronting". - SnowflakeRendezvousMethod string `json:"snowflake_rendezvous_method"` - // SoftwareName is the MANDATORY name of the application // that will be using the new [Session]. SoftwareName string `json:"software_name"` @@ -64,15 +34,6 @@ type SessionConfig struct { // store temporary files deleted when we close the [Session]. TempDir string `json:"temp_dir"` - // TorArgs contains OPTIONAL arguments to pass to the "tor" binary - // when ProxyURL is `tor:///` or `torsf:///`. - TorArgs []string `json:"tor_args"` - - // TorBinary is the OPTIONAL "tor" binary to use. When using this code - // on mobile devices, we link with tor directly, so there is no need to - // specify this argument when running on a mobile device. - TorBinary string `json:"tor_binary"` - // TunnelDir is the MANDATORY directory where the [Session] shall store // persistent data regarding circumvention tunnels. TunnelDir string `json:"tunnel_dir"` @@ -121,12 +82,6 @@ func (sc *SessionConfig) mkdirAll() error { // Session is a measurement session. The zero value is invalid; please // create a new instance using the [NewSession] factory. type Session struct { - // child is a thread-safe wrapper for the underlying [engine.Session]. - child *engine.Session - - // config is the [engine.SessionConfig] to use. - config *engine.SessionConfig - // closeJustOnce ensures we close this [Session] just once. closeJustOnce sync.Once @@ -139,11 +94,38 @@ type Session struct { // mu provides mutual exclusion. mu sync.Mutex + // softwareName is the software name. + softwareName string + + // softwareVersion is the software version. + softwareVersion string + + // stateDir is the directory containing state. + stateDir string + + // state contains the optional state. + state optional[*engineSessionState] + + // tempDir is the temporary directory root. + tempDir string + + // tunnelDir is the directory containing tunnel state. + tunnelDir string +} + +// engineSessionState contains the state associated with an [engine.Session]. +type engineSessionState struct { + // checkIn contains the check-in API response. + checkIn *model.OOAPICheckInResult + + // geoloc contains the geolocation. + geoloc *Location + // psc is the [probeservices.Client] to use. psc *probeservices.Client - // submitter is the [probeservices.Submitter] to use. - submitter *probeservices.Submitter + // sess is the underlying [engine.Session]. + sess *engine.Session } // NewSession creates a new [Session] instance. @@ -158,31 +140,6 @@ func NewSession(config *SessionConfig) (*Session, error) { return nil, err } - // create keyvalue store inside the user provided StateDir. - kvstore, err := kvstore.NewFS(config.StateDir) - if err != nil { - return nil, err - } - - // honor user-provided backend service, if any. - var availableps []model.OOAPIService - if config.BackendURL != "" { - availableps = append(availableps, model.OOAPIService{ - Address: config.BackendURL, - Type: "https", - }) - } - - // honor user-provided proxy, if any. - var proxyURL *url.URL - if config.ProxyURL != "" { - var err error - proxyURL, err = url.Parse(config.ProxyURL) - if err != nil { - return nil, err - } - } - // create the base event emitter const buffer = 1024 emitter := make(chan *Event, buffer) @@ -190,345 +147,77 @@ func NewSession(config *SessionConfig) (*Session, error) { // create a logger using the base event emitter logger := newLoggerEmitter(emitter, config.Verbose) - // create the underlying session using the [engine] package. - engineConfig := engine.SessionConfig{ - AvailableProbeServices: availableps, - KVStore: kvstore, - Logger: logger, - ProxyURL: proxyURL, - SoftwareName: config.SoftwareName, - SoftwareVersion: config.SoftwareVersion, - TempDir: config.TempDir, - TorArgs: config.TorArgs, - TorBinary: config.TorBinary, - SnowflakeRendezvous: config.SnowflakeRendezvousMethod, - TunnelDir: config.TunnelDir, - } - // assemble and return a session. sess := &Session{ - child: nil, - config: &engineConfig, - closeJustOnce: sync.Once{}, - emitter: emitter, - logger: logger, - mu: sync.Mutex{}, + closeJustOnce: sync.Once{}, + emitter: emitter, + logger: logger, + mu: sync.Mutex{}, + softwareName: config.SoftwareName, + softwareVersion: config.SoftwareVersion, + stateDir: config.StateDir, + state: none[*engineSessionState](), + tempDir: config.TempDir, + tunnelDir: config.TunnelDir, } return sess, nil } -// Platform returns the operating system platform name. -func (sess *Session) Platform() string { - return platform.Name() -} - -// SoftwareName returns the configured software name. -func (sess *Session) SoftwareName() string { - return sess.config.SoftwareName -} - -// SoftwareVersion returns the configured software version. -func (sess *Session) SoftwareVersion() string { - return sess.config.SoftwareVersion -} - -// Bootstrap ensures that we bootstrap the [Session]. -func (sess *Session) Bootstrap(ctx context.Context) *Task[Void] { - task := &Task[Void]{ - done: make(chan any), - events: sess.emitter, - failure: nil, - result: Void{}, - } - - go func() { - // synchronize with Task.Result - defer close(task.done) - - // create a new underlying session instance - child, err := engine.NewSession(ctx, *sess.config) - if err != nil { - task.failure = err - return - } - - // create a probeservices client - psc, err := child.NewProbeServicesClient(ctx) - if err != nil { - child.Close() - task.failure = err - return - } - - // create a submitter - submitter, err := child.NewSubmitter(ctx) - if err != nil { - child.Close() - task.failure = err - return - } - - // lock and store the underlying fields - defer sess.mu.Unlock() - sess.mu.Lock() - sess.child = child - sess.psc = psc - sess.submitter = submitter - }() - - return task -} - // ErrNoBootstrap indicates that you did not bootstrap the [Session]. var ErrNoBootstrap = errors.New("bootstrap the session first") -// CheckIn invokes the backend check-in API using the [Session]. -func (sess *Session) CheckIn( - ctx context.Context, config *model.OOAPICheckInConfig) *Task[*model.OOAPICheckInResult] { - task := &Task[*model.OOAPICheckInResult]{ - done: make(chan any), - events: sess.emitter, - failure: nil, - result: nil, - } - - go func() { - // synchronize with Task.Result - defer close(task.done) - - // lock and access the underlying session - sess.mu.Lock() - defer sess.mu.Unlock() - - // handle the case where we did not bootstrap - if sess.child == nil { - task.failure = ErrNoBootstrap - return - } - runtimex.Assert(sess.psc != nil, "sess.psc is nil") - - // invoke the backend check-in API - result, err := sess.psc.CheckIn(ctx, *config) - if err != nil { - task.failure = err - return - } - - // pass result to the caller - task.result = result - }() +// CheckInResult returns the check-in API result. +func (s *Session) CheckInResult() (*model.OOAPICheckInResult, error) { + // make sure this method is synchronized + defer s.mu.Unlock() + s.mu.Lock() - return task -} - -// Geolocate uses the [Session] to geolocate the probe. -func (sess *Session) Geolocate(ctx context.Context) *Task[*Location] { - task := &Task[*Location]{ - done: make(chan any), - events: sess.emitter, - failure: nil, - result: nil, + // handle the case where there's no state. + if s.state.IsNone() { + return nil, ErrNoBootstrap } - go func() { - // synchronize with Task.Result - defer close(task.done) - - // lock and access the underlying session - sess.mu.Lock() - defer sess.mu.Unlock() - - // handle the case where we did not bootstrap - if sess.child == nil { - task.failure = ErrNoBootstrap - return - } - - // perform geolocation and handle failure - if err := sess.child.MaybeLookupLocationContext(ctx); err != nil { - task.failure = err - return - } - - // copy results to the caller - task.result = &Location{ - ProbeASN: int64(sess.child.ProbeASN()), - ProbeASNString: sess.child.ProbeASNString(), - ProbeCC: sess.child.ProbeCC(), - ProbeNetworkName: sess.child.ProbeNetworkName(), - ProbeIP: sess.child.ProbeIP(), - ResolverASN: int64(sess.child.ResolverASN()), - ResolverASNString: sess.child.ResolverASNString(), - ResolverIP: sess.child.ResolverIP(), - ResolverNetworkName: sess.child.ResolverNetworkName(), - } - }() - - return task -} - -// MeasurementResult contains the results of [Session.Measure] -type MeasurementResult struct { - // KibiBytesReceived contains the KiB we received - KibiBytesReceived float64 - - // KibiBytesSent contains the KiB we sent - KibiBytesSent float64 - - // Measurement is the generated [model.Measurement] - Measurement *model.Measurement `json:"measurement"` - - // Summary is the corresponding summary. - Summary any `json:"summary"` -} - -// Experiment is this package's view of a [model.Experiment] -type Experiment interface { - // GetSummaryKeys returns a data structure containing a - // summary of the test keys for ooniprobe. - GetSummaryKeys(m *model.Measurement) (any, error) - - // KibiBytesReceived accounts for the KibiBytes received by the experiment. - KibiBytesReceived() float64 - - // KibiBytesSent is like KibiBytesReceived but for the bytes sent. - KibiBytesSent() float64 - - // MeasureWithContext performs a synchronous measurement. - // - // Return value: strictly either a non-nil measurement and - // a nil error or a nil measurement and a non-nil error. - MeasureWithContext(ctx context.Context, input string) (*model.Measurement, error) + // return the underlying value + return s.state.Unwrap().checkIn, nil } -// NewExperiment creates a new [Experiment] instance, -func (sess *Session) NewExperiment(name string, options map[string]any) (Experiment, error) { - // lock and access the underlying session - sess.mu.Lock() - defer sess.mu.Unlock() +// GeolocateResult returns the geolocation result. +func (s *Session) GeolocateResult() (*Location, error) { + // make sure this method is synchronized + defer s.mu.Unlock() + s.mu.Lock() - // handle the case where we did not bootstrap - if sess.child == nil { + // handle the case where there's no state. + if s.state.IsNone() { return nil, ErrNoBootstrap } - // create a [model.ExperimentBuilder] - builder, err := sess.child.NewExperimentBuilder(name) - if err != nil { - return nil, err - } - - // set the proper callbacks for the experiment - callbacks := &callbacks{sess.emitter} - builder.SetCallbacks(callbacks) - - // set the proper options for the experiment - if err := builder.SetOptionsAny(options); err != nil { - return nil, err - } - - // create an experiment instance - return builder.NewExperiment(), nil + // return the underlying value + return s.state.Unwrap().geoloc, nil } -// Measure performs a measurement using the given experiment and input. -func (sess *Session) Measure( - ctx context.Context, exp Experiment, input string) *Task[*MeasurementResult] { - task := &Task[*MeasurementResult]{ - done: make(chan any), - events: sess.emitter, - failure: nil, - result: nil, - } - - go func() { - // synchronize with Task.Result - defer close(task.done) - - // lock and access the underlying session - sess.mu.Lock() - defer sess.mu.Unlock() - - // handle the case where we did not bootstrap - if sess.child == nil { - task.failure = ErrNoBootstrap - return - } - - // perform the measurement - meas, err := exp.MeasureWithContext(ctx, input) - if err != nil { - task.failure = err - return - } +// Close closes the [Session]. This function is safe to call multiple +// times. We'll close underlying resources on the first invocation and +// otherwise do nothing for subsequent invocations. +func (s *Session) Close() (err error) { + s.closeJustOnce.Do(func() { + // make sure the cleanup is synchronized. + defer s.mu.Unlock() + s.mu.Lock() - // obtain the summary - summary, err := exp.GetSummaryKeys(meas) - if err != nil { - task.failure = err + // handle the case where there is no state. + if s.state.IsNone() { return } - // pass response to the caller - task.result = &MeasurementResult{ - KibiBytesReceived: exp.KibiBytesReceived(), - KibiBytesSent: exp.KibiBytesSent(), - Measurement: meas, - Summary: summary, - } - }() + // obtain the underlying state + state := s.state.Unwrap() - return task -} + // replace with empty state + s.state = none[*engineSessionState]() -// Submit submits a [model.Measurement] to the OONI backend. -func (sess *Session) Submit(ctx context.Context, meas *model.Measurement) *Task[string] { - task := &Task[string]{ - done: make(chan any), - events: sess.emitter, - failure: nil, - result: "", - } - - go func() { - // synchronize with Task.Result - defer close(task.done) - - // lock and access the underlying session - sess.mu.Lock() - defer sess.mu.Unlock() - - // handle the case where we did not bootstrap - if sess.child == nil { - task.failure = ErrNoBootstrap - return - } - runtimex.Assert(sess.submitter != nil, "sess.psc is nil") - - // submit without causing data races - reportID, err := sess.submitter.SubmitWithoutModifyingMeasurement(ctx, meas) - if err != nil { - task.failure = err - return - } - - // pass the reportID to the caller - task.result = reportID - }() - - return task -} - -// Close closes the [Session]. This function does not attempt -// to close an already closed [Session]. -func (sess *Session) Close() (err error) { - sess.closeJustOnce.Do(func() { - sess.mu.Lock() - if sess.child != nil { - err = sess.child.Close() - sess.child = nil - } - sess.mu.Unlock() + // close the underlying session + err = state.sess.Close() }) return err } diff --git a/internal/miniengine/submit.go b/internal/miniengine/submit.go new file mode 100644 index 0000000000..20fde5ef8b --- /dev/null +++ b/internal/miniengine/submit.go @@ -0,0 +1,44 @@ +package miniengine + +// +// Code to submit a measurement to the OONI backend +// + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) + +// Submit submits a [model.Measurement] to the OONI backend. You MUST initialize +// the measurement's report ID. You can find the report ID for each experiment +// in the results of the check-in API. +func (s *Session) Submit(ctx context.Context, meas *model.Measurement) *Task[Void] { + task := &Task[Void]{ + done: make(chan any), + events: s.emitter, + failure: nil, + result: Void{}, + } + go s.submitAsync(ctx, meas, task) + return task +} + +// submitAsync submits the measurement in a background goroutine. +func (s *Session) submitAsync(ctx context.Context, meas *model.Measurement, task *Task[Void]) { + // synchronize with Task.Result + defer close(task.done) + + // lock and access the underlying session + s.mu.Lock() + defer s.mu.Unlock() + + // handle the case where we did not bootstrap + if s.state.IsNone() { + task.failure = ErrNoBootstrap + return + } + state := s.state.Unwrap() + + // submit the measurement to the backend + task.failure = state.psc.SubmitMeasurement(ctx, meas) +} diff --git a/internal/probeservices/collector.go b/internal/probeservices/collector.go index 34eea6c3c5..9df89b3df5 100644 --- a/internal/probeservices/collector.go +++ b/internal/probeservices/collector.go @@ -23,6 +23,25 @@ var ( ErrJSONFormatNotSupported = errors.New("JSON format not supported") ) +// ErrEmptyReportID indicates you passed a measurement with an empty ReportID +// to a function that submits measurements directly. +var ErrEmptyReportID = errors.New("probeservices: empty report ID") + +// SubmitMeasurement submits the given measurement to the OONI collector. You MUST initialize +// the report ID of the measurement, otherwise the submission will fail immediately. +func (c Client) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { + if m.ReportID == "" { + return ErrEmptyReportID + } + var updateResponse model.OOAPICollectorUpdateResponse + return c.APIClientTemplate.WithBodyLogging().Build().PostJSON( + ctx, fmt.Sprintf("/report/%s", m.ReportID), model.OOAPICollectorUpdateRequest{ + Format: "json", + Content: m, + }, &updateResponse, + ) +} + // NewReportTemplate creates a new ReportTemplate from a Measurement. func NewReportTemplate(m *model.Measurement) model.OOAPIReportTemplate { return model.OOAPIReportTemplate{ From 053e638b0faaed886ae5151c4326a4a2062896e5 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 30 Mar 2023 14:22:32 +0000 Subject: [PATCH 05/11] cleanup: back out unnecessary changes --- internal/engine/experiment.go | 2 +- internal/engine/session.go | 2 +- internal/probeservices/collector.go | 37 +++++------------------------ 3 files changed, 8 insertions(+), 33 deletions(-) diff --git a/internal/engine/experiment.go b/internal/engine/experiment.go index 30ced3783a..5a850693ae 100644 --- a/internal/engine/experiment.go +++ b/internal/engine/experiment.go @@ -193,7 +193,7 @@ func (e *experiment) SubmitAndUpdateMeasurementContext( if e.report == nil { return errors.New("report is not open") } - return e.report.SubmitMeasurement(ctx, measurement, true) + return e.report.SubmitMeasurement(ctx, measurement) } // newMeasurement creates a new measurement for this experiment with the given input. diff --git a/internal/engine/session.go b/internal/engine/session.go index 47c99d3c83..fb988b31b0 100644 --- a/internal/engine/session.go +++ b/internal/engine/session.go @@ -442,7 +442,7 @@ func (s *Session) NewProbeServicesClient(ctx context.Context) (*probeservices.Cl } // NewSubmitter creates a new submitter instance. -func (s *Session) NewSubmitter(ctx context.Context) (*probeservices.Submitter, error) { +func (s *Session) NewSubmitter(ctx context.Context) (Submitter, error) { psc, err := s.NewProbeServicesClient(ctx) if err != nil { return nil, err diff --git a/internal/probeservices/collector.go b/internal/probeservices/collector.go index 9df89b3df5..132795aa20 100644 --- a/internal/probeservices/collector.go +++ b/internal/probeservices/collector.go @@ -100,15 +100,9 @@ func (r reportChan) CanSubmit(m *model.Measurement) bool { // such that it contains the report ID for which it has been // submitted. Otherwise, we'll set the report ID to the empty // string, so that you know which measurements weren't submitted. -func (r reportChan) SubmitMeasurement( - ctx context.Context, - m *model.Measurement, - edit bool, -) error { +func (r reportChan) SubmitMeasurement(ctx context.Context, m *model.Measurement) error { var updateResponse model.OOAPICollectorUpdateResponse - if edit { - m.ReportID = r.ID - } + m.ReportID = r.ID err := r.client.APIClientTemplate.WithBodyLogging().Build().PostJSON( ctx, fmt.Sprintf("/report/%s", r.ID), model.OOAPICollectorUpdateRequest{ Format: "json", @@ -116,9 +110,7 @@ func (r reportChan) SubmitMeasurement( }, &updateResponse, ) if err != nil { - if edit { - m.ReportID = "" - } + m.ReportID = "" return err } return nil @@ -134,7 +126,7 @@ func (r reportChan) ReportID() string { type ReportChannel interface { CanSubmit(m *model.Measurement) bool ReportID() string - SubmitMeasurement(ctx context.Context, m *model.Measurement, edit bool) error + SubmitMeasurement(ctx context.Context, m *model.Measurement) error } var _ ReportChannel = &reportChan{} @@ -167,32 +159,15 @@ func NewSubmitter(opener ReportOpener, logger model.Logger) *Submitter { // Submit submits the current measurement to the OONI backend created using // the ReportOpener passed to the constructor. func (sub *Submitter) Submit(ctx context.Context, m *model.Measurement) error { - _, err := sub.submit(ctx, m, true) - return err -} - -// SubmitWithoutModifyingMeasurement is like Submit but does not modify -// the measurement in place (which may cause a data race). -func (sub *Submitter) SubmitWithoutModifyingMeasurement( - ctx context.Context, - m *model.Measurement, -) (string, error) { - return sub.submit(ctx, m, false) -} - -func (sub *Submitter) submit(ctx context.Context, m *model.Measurement, edit bool) (string, error) { var err error sub.mu.Lock() defer sub.mu.Unlock() if sub.channel == nil || !sub.channel.CanSubmit(m) { sub.channel, err = sub.opener.OpenReport(ctx, NewReportTemplate(m)) if err != nil { - return "", err + return err } sub.logger.Infof("New reportID: %s", sub.channel.ReportID()) } - if err := sub.channel.SubmitMeasurement(ctx, m, edit); err != nil { - return "", err - } - return sub.channel.ReportID(), nil + return sub.channel.SubmitMeasurement(ctx, m) } From 71142d1ad9c6ad09726b748880529ed94a6b71af Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 30 Mar 2023 17:38:32 +0000 Subject: [PATCH 06/11] feat: start adapting ooniprobe to use miniengine --- cmd/ooniprobe/internal/cli/geoip/geoip.go | 44 ++++++++-- cmd/ooniprobe/internal/cli/info/info.go | 4 +- cmd/ooniprobe/internal/cli/root/root.go | 9 -- cmd/ooniprobe/internal/nettests/dash.go | 10 +-- cmd/ooniprobe/internal/nettests/dnscheck.go | 35 +------- .../internal/nettests/facebook_messenger.go | 8 +- .../http_header_field_manipulation.go | 8 +- .../nettests/http_invalid_request_line.go | 8 +- cmd/ooniprobe/internal/nettests/ndt.go | 10 +-- cmd/ooniprobe/internal/nettests/nettests.go | 84 ++++++++++--------- cmd/ooniprobe/internal/nettests/psiphon.go | 8 +- cmd/ooniprobe/internal/nettests/riseupvpn.go | 9 +- cmd/ooniprobe/internal/nettests/run.go | 37 ++++++-- cmd/ooniprobe/internal/nettests/signal.go | 8 +- .../internal/nettests/stunreachability.go | 33 +------- cmd/ooniprobe/internal/nettests/tasks.go | 55 ++++++++++++ cmd/ooniprobe/internal/nettests/telegram.go | 8 +- cmd/ooniprobe/internal/nettests/tor.go | 8 +- cmd/ooniprobe/internal/nettests/torsf.go | 10 +-- cmd/ooniprobe/internal/nettests/vanillator.go | 10 +-- .../internal/nettests/web_connectivity.go | 46 +++------- cmd/ooniprobe/internal/nettests/whatsapp.go | 8 +- cmd/ooniprobe/internal/ooni/ooni.go | 50 ++++------- cmd/ooniprobe/internal/ooni/ooni_test.go | 3 +- internal/database/actions.go | 13 ++- internal/miniengine/bootstrap.go | 6 ++ 26 files changed, 255 insertions(+), 277 deletions(-) create mode 100644 cmd/ooniprobe/internal/nettests/tasks.go diff --git a/cmd/ooniprobe/internal/cli/geoip/geoip.go b/cmd/ooniprobe/internal/cli/geoip/geoip.go index 21d4ea59d8..b5074b29dd 100644 --- a/cmd/ooniprobe/internal/cli/geoip/geoip.go +++ b/cmd/ooniprobe/internal/cli/geoip/geoip.go @@ -8,6 +8,7 @@ import ( "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/cli/root" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" ) @@ -20,13 +21,13 @@ func init() { type dogeoipconfig struct { Logger log.Interface - NewProbeCLI func() (ooni.ProbeCLI, error) + NewProbeCLI func() (*ooni.Probe, error) SectionTitle func(string) } var defaultconfig = dogeoipconfig{ Logger: log.Log, - NewProbeCLI: root.NewProbeCLI, + NewProbeCLI: root.Init, SectionTitle: output.SectionTitle, } @@ -37,23 +38,48 @@ func dogeoip(config dogeoipconfig) error { return err } - engine, err := probeCLI.NewProbeEngine(context.Background(), model.RunTypeManual) + // create a measurement session + sessConfig := probeCLI.NewSessionConfig(model.RunTypeManual) + sess, err := miniengine.NewSession(sessConfig) if err != nil { + log.WithError(err).Error("Failed to create a measurement session") return err } - defer engine.Close() + defer sess.Close() - err = engine.MaybeLookupLocation() + // XXX: not very lightweight to perform a full bootstrap here + + // bootstrap the measurement session + bootstrapConfig := &miniengine.BootstrapConfig{ + BackendURL: "", + CategoryCodes: []string{}, + Charging: true, + OnWiFi: true, + ProxyURL: probeCLI.ProxyURL(), + RunType: model.RunTypeManual, + SnowflakeRendezvousMethod: "", + TorArgs: []string{}, + TorBinary: "", + } + bootstrapTask := sess.Bootstrap(context.Background(), bootstrapConfig) + // XXX: skipping log messages here + <-bootstrapTask.Done() + if _, err := bootstrapTask.Result(); err != nil { + log.WithError(err).Error("Failed to bootstrap a measurement session") + return err + } + + location, err := sess.GeolocateResult() if err != nil { return err } config.Logger.WithFields(log.Fields{ "type": "table", - "asn": engine.ProbeASNString(), - "network_name": engine.ProbeNetworkName(), - "country_code": engine.ProbeCC(), - "ip": engine.ProbeIP(), + "asn": location.ProbeASNString, + "network_name": location.ProbeNetworkName, + "country_code": location.ProbeCC, + "ip": location.ProbeIP, }).Info("Looked up your location") return nil diff --git a/cmd/ooniprobe/internal/cli/info/info.go b/cmd/ooniprobe/internal/cli/info/info.go index 09974cdcb7..83c82724b1 100644 --- a/cmd/ooniprobe/internal/cli/info/info.go +++ b/cmd/ooniprobe/internal/cli/info/info.go @@ -16,12 +16,12 @@ func init() { type doinfoconfig struct { Logger log.Interface - NewProbeCLI func() (ooni.ProbeCLI, error) + NewProbeCLI func() (*ooni.Probe, error) } var defaultconfig = doinfoconfig{ Logger: log.Log, - NewProbeCLI: root.NewProbeCLI, + NewProbeCLI: root.Init, } func doinfo(config doinfoconfig) error { diff --git a/cmd/ooniprobe/internal/cli/root/root.go b/cmd/ooniprobe/internal/cli/root/root.go index 3bbcce9eb2..bbd51b02fd 100644 --- a/cmd/ooniprobe/internal/cli/root/root.go +++ b/cmd/ooniprobe/internal/cli/root/root.go @@ -20,15 +20,6 @@ var Command = Cmd.Command // Init should be called by all subcommand that care to have a ooni.Context instance var Init func() (*ooni.Probe, error) -// NewProbeCLI is like Init but returns a ooni.ProbeCLI instead. -func NewProbeCLI() (ooni.ProbeCLI, error) { - probeCLI, err := Init() - if err != nil { - return nil, err - } - return probeCLI, nil -} - func init() { configPath := Cmd.Flag("config", "Set a custom config file path").Short('c').String() diff --git a/cmd/ooniprobe/internal/nettests/dash.go b/cmd/ooniprobe/internal/nettests/dash.go index 80f8618dff..59ac3b39d4 100644 --- a/cmd/ooniprobe/internal/nettests/dash.go +++ b/cmd/ooniprobe/internal/nettests/dash.go @@ -6,9 +6,9 @@ type Dash struct { // Run starts the test func (d Dash) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("dash") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "dash", + "", // TODO(bassosimone) + []string{""}, + ) } diff --git a/cmd/ooniprobe/internal/nettests/dnscheck.go b/cmd/ooniprobe/internal/nettests/dnscheck.go index beebcd306d..d0d5deffb3 100644 --- a/cmd/ooniprobe/internal/nettests/dnscheck.go +++ b/cmd/ooniprobe/internal/nettests/dnscheck.go @@ -1,42 +1,11 @@ package nettests -import ( - "context" - - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" -) +import "errors" // DNSCheck nettest implementation. type DNSCheck struct{} -func (n DNSCheck) lookupURLs(ctl *Controller) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // not needed because we have default static input in the engine - }, - ExperimentName: "dnscheck", - InputPolicy: model.InputOrStaticDefault, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // Run starts the nettest. func (n DNSCheck) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("dnscheck") - if err != nil { - return err - } - urls, err := n.lookupURLs(ctl) - if err != nil { - return err - } - return ctl.Run(builder, urls) + return errors.New("not implemented") } diff --git a/cmd/ooniprobe/internal/nettests/facebook_messenger.go b/cmd/ooniprobe/internal/nettests/facebook_messenger.go index 1316babee5..2fef9ebca0 100644 --- a/cmd/ooniprobe/internal/nettests/facebook_messenger.go +++ b/cmd/ooniprobe/internal/nettests/facebook_messenger.go @@ -6,11 +6,9 @@ type FacebookMessenger struct { // Run starts the test func (h FacebookMessenger) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "facebook_messenger", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go b/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go index 6fdd39688f..7ebe15cfcb 100644 --- a/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go +++ b/cmd/ooniprobe/internal/nettests/http_header_field_manipulation.go @@ -6,11 +6,9 @@ type HTTPHeaderFieldManipulation struct { // Run starts the test func (h HTTPHeaderFieldManipulation) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "http_header_field_manipulation", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go b/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go index fb87e462d6..6292f1a26c 100644 --- a/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go +++ b/cmd/ooniprobe/internal/nettests/http_invalid_request_line.go @@ -6,11 +6,9 @@ type HTTPInvalidRequestLine struct { // Run starts the test func (h HTTPInvalidRequestLine) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "http_invalid_request_line", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/ndt.go b/cmd/ooniprobe/internal/nettests/ndt.go index b8848b0056..11de96d4f0 100644 --- a/cmd/ooniprobe/internal/nettests/ndt.go +++ b/cmd/ooniprobe/internal/nettests/ndt.go @@ -7,9 +7,9 @@ type NDT struct { // Run starts the test func (n NDT) Run(ctl *Controller) error { // Since 2020-03-18 probe-engine exports v7 as "ndt". - builder, err := ctl.Session.NewExperimentBuilder("ndt") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "ndt", + "", // TODO(bassosimone) + []string{""}, + ) } diff --git a/cmd/ooniprobe/internal/nettests/nettests.go b/cmd/ooniprobe/internal/nettests/nettests.go index bee5be39f7..aac6f95248 100644 --- a/cmd/ooniprobe/internal/nettests/nettests.go +++ b/cmd/ooniprobe/internal/nettests/nettests.go @@ -3,14 +3,16 @@ package nettests import ( "context" "database/sql" + "encoding/json" "fmt" + "os" "time" "github.com/apex/log" "github.com/fatih/color" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/output" - engine "github.com/ooni/probe-cli/v3/internal/engine" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" ) @@ -22,7 +24,7 @@ type Nettest interface { // NewController creates a nettest controller func NewController( - nt Nettest, probe *ooni.Probe, res *model.DatabaseResult, sess *engine.Session) *Controller { + nt Nettest, probe *ooni.Probe, res *model.DatabaseResult, sess *miniengine.Session) *Controller { return &Controller{ Probe: probe, nt: nt, @@ -35,7 +37,7 @@ func NewController( // each nettest instance has one controller type Controller struct { Probe *ooni.Probe - Session *engine.Session + Session *miniengine.Session res *model.DatabaseResult nt Nettest ntCount int @@ -115,22 +117,19 @@ func (c *Controller) SetNettestIndex(i, n int) { c.ntIndex = i } +var _ model.ExperimentCallbacks = &Controller{} + // Run runs the selected nettest using the related experiment // with the specified inputs. // // This function will continue to run in most cases but will // immediately halt if something's wrong with the file system. -func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error { +func (c *Controller) Run( + experimentName string, + checkInReportID string, + inputs []string, +) error { db := c.Probe.DB() - // This will configure the controller as handler for the callbacks - // called by ooni/probe-engine/experiment.Experiment. - builder.SetCallbacks(model.ExperimentCallbacks(c)) - c.numInputs = len(inputs) - exp := builder.NewExperiment() - defer func() { - c.res.DataUsageDown += exp.KibiBytesReceived() - c.res.DataUsageUp += exp.KibiBytesSent() - }() c.msmts = make(map[int64]*model.DatabaseMeasurement) @@ -141,15 +140,9 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error log.Debug(color.RedString("status.queued")) log.Debug(color.RedString("status.started")) - if c.Probe.Config().Sharing.UploadResults { - if err := exp.OpenReportContext(context.Background()); err != nil { - log.Debugf( - "%s: %s", color.RedString("failure.report_create"), err.Error(), - ) - } else { - log.Debugf(color.RedString("status.report_create")) - reportID = sql.NullString{String: exp.ReportID(), Valid: true} - } + canSubmit := c.Probe.Config().Sharing.UploadResults && checkInReportID != "" + if canSubmit { + reportID = sql.NullString{String: checkInReportID, Valid: true} } maxRuntime := time.Duration(c.Probe.Config().Nettests.WebsitesMaxRuntime) * time.Second @@ -186,7 +179,7 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error } msmt, err := db.CreateMeasurement( - reportID, exp.Name(), c.res.MeasurementDir, idx, resultID, urlID, + reportID, experimentName, c.res.MeasurementDir, idx, resultID, urlID, ) if err != nil { return errors.Wrap(err, "failed to create measurement") @@ -196,7 +189,10 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error if input != "" { c.OnProgress(0, fmt.Sprintf("processing input: %s", input)) } - measurement, err := exp.MeasureWithContext(context.Background(), input) + options := make(map[string]any) + measurementTask := c.Session.Measure(context.Background(), experimentName, options, input) + awaitTask(measurementTask, c) + measurementResult, err := measurementTask.Result() if err != nil { log.WithError(err).Debug(color.RedString("failure.measurement")) if err := db.Failed(c.msmts[idx64], err.Error()); err != nil { @@ -212,12 +208,21 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error continue } + // update the data usage counters + c.res.DataUsageDown += measurementResult.KibiBytesReceived + c.res.DataUsageUp += measurementResult.KibiBytesSent + + // set the measurement's reportID + measurementResult.Measurement.ReportID = checkInReportID + saveToDisk := true - if c.Probe.Config().Sharing.UploadResults { + if canSubmit { // Implementation note: SubmitMeasurement will fail here if we did fail // to open the report but we still want to continue. There will be a // bit of a spew in the logs, perhaps, but stopping seems less efficient. - if err := exp.SubmitAndUpdateMeasurementContext(context.Background(), measurement); err != nil { + submitTask := c.Session.Submit(context.Background(), measurementResult.Measurement) + awaitTask(submitTask, model.NewPrinterCallbacks(taskLogger)) + if _, err := submitTask.Result(); err != nil { log.Debug(color.RedString("failure.measurement_submission")) if err := db.UploadFailed(c.msmts[idx64], err.Error()); err != nil { return errors.Wrap(err, "failed to mark upload as failed") @@ -231,35 +236,36 @@ func (c *Controller) Run(builder model.ExperimentBuilder, inputs []string) error } // We only save the measurement to disk if we failed to upload the measurement if saveToDisk { - if err := exp.SaveMeasurement(measurement, msmt.MeasurementFilePath.String); err != nil { + if err := c.saveMeasurement(measurementResult.Measurement, msmt.MeasurementFilePath.String); err != nil { return errors.Wrap(err, "failed to save measurement on disk") } } + // make the measurement as done if err := db.Done(c.msmts[idx64]); err != nil { return errors.Wrap(err, "failed to mark measurement as done") } - // We're not sure whether it's enough to log the error or we should - // instead also mark the measurement as failed. Strictly speaking this - // is an inconsistency between the code that generate the measurement - // and the code that process the measurement. We do have some data - // but we're not gonna have a summary. To be reconsidered. - tk, err := exp.GetSummaryKeys(measurement) - if err != nil { - log.WithError(err).Error("failed to obtain testKeys") - continue - } - log.Debugf("Fetching: %d %v", idx, c.msmts[idx64]) - if err := db.AddTestKeys(c.msmts[idx64], tk); err != nil { + // write the measurement summary into the database + if err := db.AddTestKeys(c.msmts[idx64], measurementResult.Summary); err != nil { return errors.Wrap(err, "failed to add test keys to summary") } } + db.UpdateUploadedStatus(c.res) log.Debugf("status.end") return nil } +// saveMeasurement saves a measurement to disk +func (c *Controller) saveMeasurement(meas *model.Measurement, filepath string) error { + data, err := json.Marshal(meas) + if err != nil { + return err + } + return os.WriteFile(filepath, data, 0600) +} + // OnProgress should be called when a new progress event is available. func (c *Controller) OnProgress(perc float64, msg string) { // when we have maxRuntime, honor it diff --git a/cmd/ooniprobe/internal/nettests/psiphon.go b/cmd/ooniprobe/internal/nettests/psiphon.go index 940340cc4e..d22e195668 100644 --- a/cmd/ooniprobe/internal/nettests/psiphon.go +++ b/cmd/ooniprobe/internal/nettests/psiphon.go @@ -6,11 +6,9 @@ type Psiphon struct { // Run starts the test func (h Psiphon) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "psiphon", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/riseupvpn.go b/cmd/ooniprobe/internal/nettests/riseupvpn.go index 185fbcefe4..f87b2701f5 100644 --- a/cmd/ooniprobe/internal/nettests/riseupvpn.go +++ b/cmd/ooniprobe/internal/nettests/riseupvpn.go @@ -6,12 +6,9 @@ type RiseupVPN struct { // Run starts the test func (h RiseupVPN) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "riseupvpn", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/run.go b/cmd/ooniprobe/internal/nettests/run.go index 25385a647e..0e34179294 100644 --- a/cmd/ooniprobe/internal/nettests/run.go +++ b/cmd/ooniprobe/internal/nettests/run.go @@ -8,7 +8,9 @@ import ( "github.com/apex/log" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/ooni" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/pkg/errors" ) @@ -59,28 +61,45 @@ func RunGroup(config RunGroupConfig) error { return nil } - sess, err := config.Probe.NewSession(context.Background(), config.RunType) + // create a measurement session + sessConfig := config.Probe.NewSessionConfig(config.RunType) + sess, err := miniengine.NewSession(sessConfig) if err != nil { log.WithError(err).Error("Failed to create a measurement session") return err } defer sess.Close() - err = sess.MaybeLookupLocation() - if err != nil { - log.WithError(err).Error("Failed to lookup the location of the probe") + // bootstrap the measurement session + log.Debugf("Enabled category codes are the following %v", config.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) + bootstrapConfig := &miniengine.BootstrapConfig{ + BackendURL: "", + CategoryCodes: config.Probe.Config().Nettests.WebsitesEnabledCategoryCodes, + Charging: true, + OnWiFi: true, + ProxyURL: config.Probe.ProxyURL(), + RunType: config.RunType, + SnowflakeRendezvousMethod: "", + TorArgs: []string{}, + TorBinary: "", + } + bootstrapTask := sess.Bootstrap(context.Background(), bootstrapConfig) + awaitTask(bootstrapTask, model.NewPrinterCallbacks(taskLogger)) + if _, err := bootstrapTask.Result(); err != nil { + log.WithError(err).Error("Failed to bootstrap a measurement session") return err } + + // obtain the probe location + location := runtimex.Try1(sess.GeolocateResult()) + + // create the corresponding network inside the database db := config.Probe.DB() - network, err := db.CreateNetwork(sess) + network, err := db.CreateNetwork(location) if err != nil { log.WithError(err).Error("Failed to create the network row") return err } - if err := sess.MaybeLookupBackends(); err != nil { - log.WithError(err).Warn("Failed to discover OONI backends") - return err - } group, ok := All[config.GroupName] if !ok { diff --git a/cmd/ooniprobe/internal/nettests/signal.go b/cmd/ooniprobe/internal/nettests/signal.go index 3a2df6b874..3f95d57620 100644 --- a/cmd/ooniprobe/internal/nettests/signal.go +++ b/cmd/ooniprobe/internal/nettests/signal.go @@ -5,11 +5,9 @@ type Signal struct{} // Run starts the nettest. func (h Signal) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "signal", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/stunreachability.go b/cmd/ooniprobe/internal/nettests/stunreachability.go index 186bb7bb98..f8fa58377b 100644 --- a/cmd/ooniprobe/internal/nettests/stunreachability.go +++ b/cmd/ooniprobe/internal/nettests/stunreachability.go @@ -1,42 +1,13 @@ package nettests import ( - "context" - - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" + "errors" ) // STUNReachability nettest implementation. type STUNReachability struct{} -func (n STUNReachability) lookupURLs(ctl *Controller) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // not needed because we have default static input in the engine - }, - ExperimentName: "stunreachability", - InputPolicy: model.InputOrStaticDefault, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // Run starts the nettest. func (n STUNReachability) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("stunreachability") - if err != nil { - return err - } - urls, err := n.lookupURLs(ctl) - if err != nil { - return err - } - return ctl.Run(builder, urls) + return errors.New("not implemented") } diff --git a/cmd/ooniprobe/internal/nettests/tasks.go b/cmd/ooniprobe/internal/nettests/tasks.go new file mode 100644 index 0000000000..a242e823d0 --- /dev/null +++ b/cmd/ooniprobe/internal/nettests/tasks.go @@ -0,0 +1,55 @@ +package nettests + +import ( + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// taskLogger is the logger used for logging tasks. +var taskLogger = log.WithFields(log.Fields{ + "type": "engine", +}) + +// runningTask is a [miniengine.Task] that is still running. +type runningTask interface { + Done() <-chan any + Events() <-chan *miniengine.Event +} + +// TODO(bassosimone): we need to set the verbosity + +// processTaskEvent processes an event emitted by a runingTask. +func processTaskEvent(callbacks model.ExperimentCallbacks, ev *miniengine.Event) { + switch ev.EventType { + case miniengine.EventTypeDebug: + taskLogger.Debug(ev.Message) + case miniengine.EventTypeInfo: + taskLogger.Info(ev.Message) + case miniengine.EventTypeProgress: + callbacks.OnProgress(ev.Progress, ev.Message) + case miniengine.EventTypeWarning: + taskLogger.Warn(ev.Message) + default: + taskLogger.Warnf("UNHANDLED EVENT: %+v", ev) + } +} + +// awaitTask waits for the given runningTask to terminate. +func awaitTask(task runningTask, callbacks model.ExperimentCallbacks) { + for { + select { + case <-task.Done(): + for { + select { + case ev := <-task.Events(): + processTaskEvent(callbacks, ev) + default: + return + } + } + case ev := <-task.Events(): + processTaskEvent(callbacks, ev) + } + } +} diff --git a/cmd/ooniprobe/internal/nettests/telegram.go b/cmd/ooniprobe/internal/nettests/telegram.go index 82d75d82c2..6a0ee3334b 100644 --- a/cmd/ooniprobe/internal/nettests/telegram.go +++ b/cmd/ooniprobe/internal/nettests/telegram.go @@ -6,11 +6,9 @@ type Telegram struct { // Run starts the test func (h Telegram) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "telegram", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/tor.go b/cmd/ooniprobe/internal/nettests/tor.go index 96bfbb7d2f..307a73278b 100644 --- a/cmd/ooniprobe/internal/nettests/tor.go +++ b/cmd/ooniprobe/internal/nettests/tor.go @@ -6,11 +6,9 @@ type Tor struct { // Run starts the test func (h Tor) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "tor", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/nettests/torsf.go b/cmd/ooniprobe/internal/nettests/torsf.go index 494dae4743..02db3550a7 100644 --- a/cmd/ooniprobe/internal/nettests/torsf.go +++ b/cmd/ooniprobe/internal/nettests/torsf.go @@ -6,11 +6,11 @@ type TorSf struct { // Run starts the test func (h TorSf) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("torsf") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "torsf", + "", // TODO(bassosimone) + []string{""}, + ) } func (h TorSf) onlyBackground() {} diff --git a/cmd/ooniprobe/internal/nettests/vanillator.go b/cmd/ooniprobe/internal/nettests/vanillator.go index 11d7266549..2aca20ac2d 100644 --- a/cmd/ooniprobe/internal/nettests/vanillator.go +++ b/cmd/ooniprobe/internal/nettests/vanillator.go @@ -6,11 +6,11 @@ type VanillaTor struct { // Run starts the test func (h VanillaTor) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder("vanilla_tor") - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) + return ctl.Run( + "vanilla_tor", + "", // TODO(bassosimone) + []string{""}, + ) } func (h VanillaTor) onlyBackground() {} diff --git a/cmd/ooniprobe/internal/nettests/web_connectivity.go b/cmd/ooniprobe/internal/nettests/web_connectivity.go index fc643c0091..8a44f27202 100644 --- a/cmd/ooniprobe/internal/nettests/web_connectivity.go +++ b/cmd/ooniprobe/internal/nettests/web_connectivity.go @@ -1,52 +1,28 @@ package nettests import ( - "context" - - "github.com/apex/log" - engine "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/model" + "errors" ) -func (n WebConnectivity) lookupURLs(ctl *Controller, categories []string) ([]string, error) { - inputloader := &engine.InputLoader{ - CheckInConfig: &model.OOAPICheckInConfig{ - // Setting Charging and OnWiFi to true causes the CheckIn - // API to return to us as much URL as possible with the - // given RunType hint. - Charging: true, - OnWiFi: true, - RunType: ctl.RunType, - WebConnectivity: model.OOAPICheckInConfigWebConnectivity{ - CategoryCodes: categories, - }, - }, - ExperimentName: "web_connectivity", - InputPolicy: model.InputOrQueryBackend, - Session: ctl.Session, - SourceFiles: ctl.InputFiles, - StaticInputs: ctl.Inputs, - } - testlist, err := inputloader.Load(context.Background()) - if err != nil { - return nil, err - } - return ctl.BuildAndSetInputIdxMap(testlist) -} - // WebConnectivity test implementation type WebConnectivity struct{} // Run starts the test func (n WebConnectivity) Run(ctl *Controller) error { - log.Debugf("Enabled category codes are the following %v", ctl.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) - urls, err := n.lookupURLs(ctl, ctl.Probe.Config().Nettests.WebsitesEnabledCategoryCodes) + results, err := ctl.Session.CheckInResult() if err != nil { return err } - builder, err := ctl.Session.NewExperimentBuilder("web_connectivity") + if results.Tests.WebConnectivity == nil { + return errors.New("no web_connectivity data") + } + urls, err := ctl.BuildAndSetInputIdxMap(results.Tests.WebConnectivity.URLs) if err != nil { return err } - return ctl.Run(builder, urls) + return ctl.Run( + "web_connectivity", + results.Tests.WebConnectivity.ReportID, + urls, + ) } diff --git a/cmd/ooniprobe/internal/nettests/whatsapp.go b/cmd/ooniprobe/internal/nettests/whatsapp.go index 4660abe006..682faa7b23 100644 --- a/cmd/ooniprobe/internal/nettests/whatsapp.go +++ b/cmd/ooniprobe/internal/nettests/whatsapp.go @@ -6,11 +6,9 @@ type WhatsApp struct { // Run starts the test func (h WhatsApp) Run(ctl *Controller) error { - builder, err := ctl.Session.NewExperimentBuilder( + return ctl.Run( "whatsapp", + "", // TODO(bassosimone) + []string{""}, ) - if err != nil { - return err - } - return ctl.Run(builder, []string{""}) } diff --git a/cmd/ooniprobe/internal/ooni/ooni.go b/cmd/ooniprobe/internal/ooni/ooni.go index dcffdab6c4..3c18218742 100644 --- a/cmd/ooniprobe/internal/ooni/ooni.go +++ b/cmd/ooniprobe/internal/ooni/ooni.go @@ -3,7 +3,6 @@ package ooni import ( "context" _ "embed" // because we embed a file - "io/ioutil" "net/url" "os" "os/signal" @@ -14,9 +13,8 @@ import ( "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/config" "github.com/ooni/probe-cli/v3/cmd/ooniprobe/internal/utils" "github.com/ooni/probe-cli/v3/internal/database" - "github.com/ooni/probe-cli/v3/internal/engine" - "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/legacy/assetsdir" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" ) @@ -24,11 +22,6 @@ import ( // DefaultSoftwareName is the default software name. const DefaultSoftwareName = "ooniprobe-cli" -// logger is the logger used by the engine. -var logger = log.WithFields(log.Fields{ - "type": "engine", -}) - // ProbeCLI is the OONI Probe CLI context. type ProbeCLI interface { Config() *config.Config @@ -110,6 +103,14 @@ func (p *Probe) Terminate() { p.isTerminated.Add(1) } +// ProxyURL returns the configured proxy URL +func (p *Probe) ProxyURL() string { + if p.proxyURL != nil { + return p.proxyURL.String() + } + return "" +} + // ListenForSignals will listen for SIGINT and SIGTERM. When it receives those // signals it will set isTerminatedAtomicInt to non-zero, which will cleanly // shutdown the test logic. @@ -193,7 +194,7 @@ func (p *Probe) Init(softwareName, softwareVersion, proxy string) error { // the return value as it does not matter to us here. _, _ = assetsdir.Cleanup(utils.AssetsDir(p.home)) - tempDir, err := ioutil.TempDir("", "ooni") + tempDir, err := os.MkdirTemp("", "ooni") if err != nil { return errors.Wrap(err, "creating TempDir") } @@ -211,19 +212,8 @@ func (p *Probe) Init(softwareName, softwareVersion, proxy string) error { return nil } -// NewSession creates a new ooni/probe-engine session using the -// current configuration inside the context. The caller must close -// the session when done using it, by calling sess.Close(). -func (p *Probe) NewSession(ctx context.Context, runType model.RunType) (*engine.Session, error) { - kvstore, err := kvstore.NewFS( - utils.EngineDir(p.home), - ) - if err != nil { - return nil, errors.Wrap(err, "creating engine's kvstore") - } - if err := os.MkdirAll(p.tunnelDir, 0700); err != nil { - return nil, errors.Wrap(err, "creating tunnel dir") - } +// NewSessionConfig creates a new [miniengine.SessionConfig]. +func (p *Probe) NewSessionConfig(runType model.RunType) *miniengine.SessionConfig { // When the software name is the default software name and we're running // in unattended mode, adjust the software name accordingly. // @@ -232,24 +222,14 @@ func (p *Probe) NewSession(ctx context.Context, runType model.RunType) (*engine. if runType == model.RunTypeTimed && softwareName == DefaultSoftwareName { softwareName = DefaultSoftwareName + "-unattended" } - return engine.NewSession(ctx, engine.SessionConfig{ - KVStore: kvstore, - Logger: logger, + return &miniengine.SessionConfig{ SoftwareName: softwareName, SoftwareVersion: p.softwareVersion, + StateDir: utils.EngineDir(p.home), TempDir: p.tempDir, TunnelDir: p.tunnelDir, - ProxyURL: p.proxyURL, - }) -} - -// NewProbeEngine creates a new ProbeEngine instance. -func (p *Probe) NewProbeEngine(ctx context.Context, runType model.RunType) (ProbeEngine, error) { - sess, err := p.NewSession(ctx, runType) - if err != nil { - return nil, err + Verbose: false, } - return sess, nil } // NewProbe creates a new probe instance. diff --git a/cmd/ooniprobe/internal/ooni/ooni_test.go b/cmd/ooniprobe/internal/ooni/ooni_test.go index 4f534c7f52..60b441323c 100644 --- a/cmd/ooniprobe/internal/ooni/ooni_test.go +++ b/cmd/ooniprobe/internal/ooni/ooni_test.go @@ -1,14 +1,13 @@ package ooni import ( - "io/ioutil" "os" "path" "testing" ) func TestInit(t *testing.T) { - ooniHome, err := ioutil.TempDir("", "oonihome") + ooniHome, err := os.MkdirTemp("", "oonihome") if err != nil { t.Fatal(err) } diff --git a/internal/database/actions.go b/internal/database/actions.go index ac6bfa390d..7d2f841f65 100644 --- a/internal/database/actions.go +++ b/internal/database/actions.go @@ -12,6 +12,7 @@ import ( "time" "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/miniengine" "github.com/ooni/probe-cli/v3/internal/model" "github.com/pkg/errors" "github.com/upper/db/v4" @@ -33,8 +34,6 @@ type Database struct { sess db.Session } -var _ model.WritableDatabase = &Database{} - // Session implements Writable/ReadableDatabase.Session func (d *Database) Session() db.Session { return d.sess @@ -293,14 +292,14 @@ func (d *Database) CreateResult(homePath string, testGroupName string, networkID } // CreateNetwork implements WritableDatabase.CreateNetwork -func (d *Database) CreateNetwork(loc model.LocationProvider) (*model.DatabaseNetwork, error) { +func (d *Database) CreateNetwork(loc *miniengine.Location) (*model.DatabaseNetwork, error) { network := model.DatabaseNetwork{ - ASN: loc.ProbeASN(), - CountryCode: loc.ProbeCC(), - NetworkName: loc.ProbeNetworkName(), + ASN: uint(loc.ProbeASN), + CountryCode: loc.ProbeCC, + NetworkName: loc.ProbeNetworkName, // On desktop we consider it to always be wifi NetworkType: "wifi", - IP: loc.ProbeIP(), + IP: loc.ProbeIP, } newID, err := d.sess.Collection("networks").Insert(network) if err != nil { diff --git a/internal/miniengine/bootstrap.go b/internal/miniengine/bootstrap.go index 3ac560d889..f1591a762f 100644 --- a/internal/miniengine/bootstrap.go +++ b/internal/miniengine/bootstrap.go @@ -143,6 +143,12 @@ func (s *Session) newEngineSessionState( return nil, err } + // lookup the available backends. + if err := child.MaybeLookupBackendsContext(ctx); err != nil { + child.Close() + return nil, err + } + // call the check-in API. resp, err := s.checkIn(ctx, location, child, psc, config) if err != nil { From 16dfe088d630ce6a87946ea93d8bc84968b1ed28 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 6 Apr 2023 14:20:22 +0000 Subject: [PATCH 07/11] feat(miniengine): implement awaiting for task It's a common use case for Go code using the task, so better to just put this code inside of the miniengine package. --- internal/miniengine/measure.go | 4 +-- internal/miniengine/task.go | 45 +++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/internal/miniengine/measure.go b/internal/miniengine/measure.go index 7d2181859a..fc09a4892c 100644 --- a/internal/miniengine/measure.go +++ b/internal/miniengine/measure.go @@ -12,10 +12,10 @@ import ( // MeasurementResult contains the results of [Session.Measure] type MeasurementResult struct { // KibiBytesReceived contains the KiB we received - KibiBytesReceived float64 + KibiBytesReceived float64 `json:"kibi_bytes_received"` // KibiBytesSent contains the KiB we sent - KibiBytesSent float64 + KibiBytesSent float64 `json:"kibi_bytes_sent"` // Measurement is the generated [model.Measurement] Measurement *model.Measurement `json:"measurement"` diff --git a/internal/miniengine/task.go b/internal/miniengine/task.go index d151fdcc1b..45dfc1f51e 100644 --- a/internal/miniengine/task.go +++ b/internal/miniengine/task.go @@ -4,7 +4,10 @@ package miniengine // Task // -import "golang.org/x/net/context" +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "golang.org/x/net/context" +) // Task is a long running operation that emits [Event] while it is running and // produces a given Result. The zero value of this struct is invalid; you cannot @@ -46,3 +49,43 @@ func (t *Task[Result]) Result() (Result, error) { <-t.done // synchronize with TaskRunner.Main return t.result, t.failure } + +// Await waits for the task to complete and properly emits log messages +// using the given logger and the given callbacks for progress. +func (t *Task[Result]) Await( + ctx context.Context, + logger model.Logger, + callbacks model.ExperimentCallbacks, +) { + for { + select { + case <-ctx.Done(): + return + case <-t.Done(): + for { + select { + case ev := <-t.Events(): + t.emit(logger, callbacks, ev) + default: + return + } + } + case ev := <-t.Events(): + t.emit(logger, callbacks, ev) + } + } +} + +// emit is the helper function for emitting events called by Await. +func (t *Task[Result]) emit(logger model.Logger, callbacks model.ExperimentCallbacks, ev *Event) { + switch ev.EventType { + case EventTypeProgress: + callbacks.OnProgress(ev.Progress, ev.Message) + case EventTypeDebug: + logger.Debug(ev.Message) + case EventTypeWarning: + logger.Warn(ev.Message) + default: + logger.Info(ev.Message) + } +} From 518543b3b1907e00e3bb299d7ba2323b8d8472f8 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 6 Apr 2023 15:05:54 +0000 Subject: [PATCH 08/11] cleanup(miniengine): use the ./internal/optional package --- internal/miniengine/bootstrap.go | 3 ++- internal/miniengine/optional.go | 37 -------------------------------- internal/miniengine/session.go | 7 +++--- 3 files changed, 6 insertions(+), 41 deletions(-) delete mode 100644 internal/miniengine/optional.go diff --git a/internal/miniengine/bootstrap.go b/internal/miniengine/bootstrap.go index f1591a762f..decba46fcd 100644 --- a/internal/miniengine/bootstrap.go +++ b/internal/miniengine/bootstrap.go @@ -10,6 +10,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" "github.com/ooni/probe-cli/v3/internal/platform" "github.com/ooni/probe-cli/v3/internal/probeservices" "golang.org/x/net/context" @@ -110,7 +111,7 @@ func (s *Session) bootstrapSyncLocked(ctx context.Context, config *BootstrapConf } // MUTATE s to store the state - s.state = some(ess) + s.state = optional.Some(ess) return nil } diff --git a/internal/miniengine/optional.go b/internal/miniengine/optional.go deleted file mode 100644 index 5560f4fb92..0000000000 --- a/internal/miniengine/optional.go +++ /dev/null @@ -1,37 +0,0 @@ -package miniengine - -// -// Internally-used optional type -// - -import "github.com/ooni/probe-cli/v3/internal/runtimex" - -// optional is an optional container. -type optional[Type any] struct { - v *Type -} - -// some creates an initialized optional instance. -func some[Type any](v Type) optional[Type] { - return optional[Type]{ - v: &v, - } -} - -// none creates an empty optional instance. -func none[Type any]() optional[Type] { - return optional[Type]{ - v: nil, - } -} - -// IsNone returns whether the optional is empty. -func (o *optional[Type]) IsNone() bool { - return o.v == nil -} - -// Unwrap returns the optional value. -func (o *optional[Type]) Unwrap() Type { - runtimex.Assert(!o.IsNone(), "optional[Type] is none") - return *o.v -} diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go index 381279f6ed..aae002758c 100644 --- a/internal/miniengine/session.go +++ b/internal/miniengine/session.go @@ -12,6 +12,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" "github.com/ooni/probe-cli/v3/internal/probeservices" ) @@ -104,7 +105,7 @@ type Session struct { stateDir string // state contains the optional state. - state optional[*engineSessionState] + state optional.Value[*engineSessionState] // tempDir is the temporary directory root. tempDir string @@ -156,7 +157,7 @@ func NewSession(config *SessionConfig) (*Session, error) { softwareName: config.SoftwareName, softwareVersion: config.SoftwareVersion, stateDir: config.StateDir, - state: none[*engineSessionState](), + state: optional.None[*engineSessionState](), tempDir: config.TempDir, tunnelDir: config.TunnelDir, } @@ -214,7 +215,7 @@ func (s *Session) Close() (err error) { state := s.state.Unwrap() // replace with empty state - s.state = none[*engineSessionState]() + s.state = optional.None[*engineSessionState]() // close the underlying session err = state.sess.Close() From f5ed297bd593dd7965e25b5908e11f1a71b67d7d Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 6 Apr 2023 15:22:55 +0000 Subject: [PATCH 09/11] miniengine: prepare for refactoring and cleanups --- internal/miniengine/bootstrap.go | 9 +++++++- internal/miniengine/callbacks.go | 5 ++++- internal/miniengine/close.go | 36 ++++++++++++++++++++++++++++++++ internal/miniengine/logger.go | 3 ++- internal/miniengine/measure.go | 2 +- internal/miniengine/session.go | 26 ----------------------- internal/miniengine/submit.go | 2 +- 7 files changed, 52 insertions(+), 31 deletions(-) create mode 100644 internal/miniengine/close.go diff --git a/internal/miniengine/bootstrap.go b/internal/miniengine/bootstrap.go index decba46fcd..1e42622009 100644 --- a/internal/miniengine/bootstrap.go +++ b/internal/miniengine/bootstrap.go @@ -1,7 +1,7 @@ package miniengine // -// Code to bootstrap a measurement session +// The "bootstrap" task // import ( @@ -66,6 +66,13 @@ type BootstrapConfig struct { TorBinary string `json:"tor_binary"` } +// TODO(bassosimone): rather than having calls that return the geolocation and +// the result of the check-in, we should modify Bootstrap to return something +// like a Task[BootstrapResult] that contains both. The Bootstrap will still be +// idempotent and short circuit already existing results if they are available. +// +// By doing that, we would simplify the corresponding C API. + // Bootstrap ensures that we bootstrap the [Session]. This function // is safe to call multiple times. We'll only bootstrap on the first // invocation and do nothing for subsequent invocations. diff --git a/internal/miniengine/callbacks.go b/internal/miniengine/callbacks.go index f48a7eb14e..47ee11583a 100644 --- a/internal/miniengine/callbacks.go +++ b/internal/miniengine/callbacks.go @@ -6,8 +6,10 @@ package miniengine import "github.com/ooni/probe-cli/v3/internal/model" -// callbacks implements [model.ExperimentCallbacks] +// callbacks implements [model.ExperimentCallbacks] and emits +// the callbacks events using the given channel. type callbacks struct { + // emitter is the channel where to emit events. emitter chan<- *Event } @@ -20,6 +22,7 @@ func (c *callbacks) OnProgress(progress float64, message string) { Message: message, Progress: progress, } + // Implementation note: it's fine to lose interim events select { case c.emitter <- event: default: diff --git a/internal/miniengine/close.go b/internal/miniengine/close.go new file mode 100644 index 0000000000..400c71c9ee --- /dev/null +++ b/internal/miniengine/close.go @@ -0,0 +1,36 @@ +package miniengine + +// +// The "close" task +// + +import "github.com/ooni/probe-cli/v3/internal/optional" + +// TODO(bassosimone): we should refactor this code to return a Task[Void], which +// allows us to print logs while closing the session. + +// Close closes the [Session]. This function is safe to call multiple +// times. We'll close underlying resources on the first invocation and +// otherwise do nothing for subsequent invocations. +func (s *Session) Close() (err error) { + s.closeJustOnce.Do(func() { + // make sure the cleanup is synchronized. + defer s.mu.Unlock() + s.mu.Lock() + + // handle the case where there is no state. + if s.state.IsNone() { + return + } + + // obtain the underlying state + state := s.state.Unwrap() + + // replace with empty state + s.state = optional.None[*engineSessionState]() + + // close the underlying session + err = state.sess.Close() + }) + return err +} diff --git a/internal/miniengine/logger.go b/internal/miniengine/logger.go index 151cdc214e..9c885ef9da 100644 --- a/internal/miniengine/logger.go +++ b/internal/miniengine/logger.go @@ -10,7 +10,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/model" ) -// loggerEmitter is a [model.Logger] using an [eventEmitter]. +// loggerEmitter is a [model.Logger] and emits events using the given channel. type loggerEmitter struct { // emitter is the channel where to emit events. emitter chan<- *Event @@ -71,6 +71,7 @@ func (cl *loggerEmitter) emit(level string, message string) { Message: message, Progress: 0, } + // Implementation note: it's fine to lose interim events select { case cl.emitter <- event: default: diff --git a/internal/miniengine/measure.go b/internal/miniengine/measure.go index fc09a4892c..6e898d856f 100644 --- a/internal/miniengine/measure.go +++ b/internal/miniengine/measure.go @@ -1,7 +1,7 @@ package miniengine // -// Code to run measurements +// The "measure" task // import ( diff --git a/internal/miniengine/session.go b/internal/miniengine/session.go index aae002758c..8d76b8e5eb 100644 --- a/internal/miniengine/session.go +++ b/internal/miniengine/session.go @@ -196,29 +196,3 @@ func (s *Session) GeolocateResult() (*Location, error) { // return the underlying value return s.state.Unwrap().geoloc, nil } - -// Close closes the [Session]. This function is safe to call multiple -// times. We'll close underlying resources on the first invocation and -// otherwise do nothing for subsequent invocations. -func (s *Session) Close() (err error) { - s.closeJustOnce.Do(func() { - // make sure the cleanup is synchronized. - defer s.mu.Unlock() - s.mu.Lock() - - // handle the case where there is no state. - if s.state.IsNone() { - return - } - - // obtain the underlying state - state := s.state.Unwrap() - - // replace with empty state - s.state = optional.None[*engineSessionState]() - - // close the underlying session - err = state.sess.Close() - }) - return err -} diff --git a/internal/miniengine/submit.go b/internal/miniengine/submit.go index 20fde5ef8b..bceac2f7f9 100644 --- a/internal/miniengine/submit.go +++ b/internal/miniengine/submit.go @@ -1,7 +1,7 @@ package miniengine // -// Code to submit a measurement to the OONI backend +// The "submit" task // import ( From 405e8a8b3760163a0038c78f76dbc5e65f79b98b Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 6 Apr 2023 15:43:06 +0000 Subject: [PATCH 10/11] doc(miniengine): document other requirements --- internal/miniengine/task.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/miniengine/task.go b/internal/miniengine/task.go index 45dfc1f51e..317ec1d781 100644 --- a/internal/miniengine/task.go +++ b/internal/miniengine/task.go @@ -26,6 +26,11 @@ type Task[Result any] struct { result Result } +// TODO(bassosimone): +// +// 1. we need a way to cancel/interrupt a running Task, which would +// simplify the C API implementation a bit. + // TaskRunner runs the main function that produces a [Task] result. type TaskRunner[Result any] interface { // Main is the [Task] main function. From 0b2396a25bc4fee9cd447ce54b429e3e788b961f Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 6 Apr 2023 15:44:49 +0000 Subject: [PATCH 11/11] cleanup(miniengine): remove unused type --- internal/miniengine/task.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/miniengine/task.go b/internal/miniengine/task.go index 317ec1d781..a944947933 100644 --- a/internal/miniengine/task.go +++ b/internal/miniengine/task.go @@ -31,12 +31,6 @@ type Task[Result any] struct { // 1. we need a way to cancel/interrupt a running Task, which would // simplify the C API implementation a bit. -// TaskRunner runs the main function that produces a [Task] result. -type TaskRunner[Result any] interface { - // Main is the [Task] main function. - Main(ctx context.Context) (Result, error) -} - // Done returns a channel closed when the [Task] is done. func (t *Task[Result]) Done() <-chan any { return t.done