Skip to content

Commit

Permalink
feat: minipipeline to analyze measurements (ooni#1393)
Browse files Browse the repository at this point in the history
This diff introduces the `minipipeline` package and command. The general
idea here is to analyze experiments results using an architecture that
matches https://github.com/ooni/data as closely as possible.

The advantage of using this architecture is that we can neatly divide
the data analysis process into three phases:

1. ingestion converts an OONI measurement into observations, which are
flat data structures;

2. analysis converts observations into an analysis structure, which is
significantly simpler than observations;

3. scoring is the process (implemented by each experiment) to turn the
analysis into top-level keys.

That is:

```mermaid
stateDiagram-v2
  state "{ingestion}" as ingestion
  MEASUREMENT --> ingestion
  ingestion --> OBSERVATIONS
  state "{analysis}" as analysis
  OBSERVATIONS --> analysis
  analysis --> ANALYSIS
  state "{scoring}" as scoring
  ANALYSIS --> scoring
  scoring --> TOP_LEVEL_KEYS
```

This diff in particular tackles points 1 and 2. We'll implement 3 for
Web Connectivity LTE in a separate diff.

The diff itself commits several observations and analysis files along
with the measurements from which they originate, which we all
collectively use to write regression tests for the `minipipeline`
package.

The motivation for implementing these changes is that I need a more
clearly defined model to address the differences between Web
Connectivity LTE and v0.4 (see also
ooni#1392). Because there is already
an architectural model for implementing these changes in ooni/data, it
seems logical to borrow liberally from ooni/data.

Incidentally, this change has the benefit of making it easier, in the
future, to align ooni/probe-cli and ooni/data.

The reference issue is ooni/probe#2634.
Implementing this change seems now a precondition for implementing extra
changes that would address such an issue for good, by changing LTE's
scoring.

Unlike the ooni/data pipeline, this pipeline is minimal (hence the
name): it only aims to process measurements collected by a recent
version of ooniprobe, rather than all measurements from the beginning of
time.

While there, disable signal integration tests because of
ooni/probe#2636.
  • Loading branch information
bassosimone authored and Murphy-OrangeMud committed Feb 13, 2024
1 parent e01e0c5 commit 9920cff
Show file tree
Hide file tree
Showing 116 changed files with 44,824 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/*.csv
/*.deb
/*.exe
/*.json
/*.jsonl
/*.pprof
/*.sqlite3
Expand Down
62 changes: 62 additions & 0 deletions internal/cmd/minipipeline/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"flag"
"fmt"
"os"
"path/filepath"

"github.com/ooni/probe-cli/v3/internal/minipipeline"
"github.com/ooni/probe-cli/v3/internal/must"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

var (
// destdir is the -destdir flag
destdir = flag.String("destdir", ".", "destination directory to use")

// measurement is the -measurement flag
measurement = flag.String("measurement", "", "measurement file to analyze")

// mustWriteFileLn allows overwriting must.WriteFile in tests
mustWriteFileFn = must.WriteFile

// prefix is the -prefix flag
prefix = flag.String("prefix", "", "prefix to add to generated files")

// osExit allows overwriting os.Exit in tests
osExit = os.Exit
)

func main() {
flag.Parse()
if *measurement == "" {
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "usage: %s -measurement <file> [-prefix <prefix>]\n", filepath.Base(os.Args[0]))
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "Mini measurement processing pipeline to reprocess recent probe measurements\n")
fmt.Fprintf(os.Stderr, "and align results calculation with ooni/data.\n")
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "Analyzes the <file> provided using -measurement <file> and writes the\n")
fmt.Fprintf(os.Stderr, "observations.json and analysis.json files in the -destdir <destdir> directory,\n")
fmt.Fprintf(os.Stderr, "which must already exist.\n")
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "Use -prefix <prefix> to add <prefix> in front of the generated files names.\n")
fmt.Fprintf(os.Stderr, "\n")
osExit(1)
}

// parse the measurement file
var parsed minipipeline.WebMeasurement
must.UnmarshalJSON(must.ReadFile(*measurement), &parsed)

// generate and write observations
observationsPath := filepath.Join(*destdir, *prefix+"observations.json")
container := runtimex.Try1(minipipeline.IngestWebMeasurement(&parsed))
mustWriteFileFn(observationsPath, must.MarshalAndIndentJSON(container, "", " "), 0600)

// generate and write observations analysis
analysisPath := filepath.Join(*destdir, *prefix+"analysis.json")
analysis := minipipeline.AnalyzeWebObservations(container)
mustWriteFileFn(analysisPath, must.MarshalAndIndentJSON(analysis, "", " "), 0600)
}
101 changes: 101 additions & 0 deletions internal/cmd/minipipeline/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/must"
)

func mustloadfile(filename string) (object map[string]any) {
data := must.ReadFile(filename)
must.UnmarshalJSON(data, &object)
return
}

func mustloaddata(contentmap map[string][]byte, key string) (object map[string]any) {
data := contentmap[key]
must.UnmarshalJSON(data, &object)
return
}

func TestMainSuccess(t *testing.T) {
// make sure we set the destination directory
*destdir = "xo"

// make sure we're reading from the expected input
*measurement = filepath.Join("testdata", "measurement.json")

// make sure we store the expected output
contentmap := make(map[string][]byte)
mustWriteFileFn = func(filename string, content []byte, mode fs.FileMode) {
contentmap[filename] = content
}

// make sure osExit is correct
osExit = os.Exit

// also check whether we can add a prefix
*prefix = "y-"

// run the main function
main()

// make sure the generated observations are good
expectedObservations := mustloadfile(filepath.Join("testdata", "observations.json"))
gotObservations := mustloaddata(contentmap, filepath.Join("xo", "y-observations.json"))
if diff := cmp.Diff(expectedObservations, gotObservations); diff != "" {
t.Fatal(diff)
}

// make sure the generated analysis is good
expectedAnalysis := mustloadfile(filepath.Join("testdata", "analysis.json"))
gotAnalysis := mustloaddata(contentmap, filepath.Join("xo", "y-analysis.json"))
if diff := cmp.Diff(expectedAnalysis, gotAnalysis); diff != "" {
t.Fatal(diff)
}
}

func TestMainUsage(t *testing.T) {
// make sure we clear the destination directory
*destdir = ""

// make sure the expected input file is empty
*measurement = ""

// make sure we panic if we try to write on disk
mustWriteFileFn = func(filename string, content []byte, mode fs.FileMode) {
panic(errors.New("mustWriteFileFn"))
}

// make sure osExit is correct
osExit = func(code int) {
panic(fmt.Errorf("osExit: %d", code))
}

// make sure the prefix is also clean
*prefix = ""

var err error
func() {
// intercept panic caused by osExit or other panics
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()

// run the main function with the given args
main()
}()

// make sure we've got the expected error
if err == nil || err.Error() != "osExit: 1" {
t.Fatal("expected", "os.Exit: 1", "got", err)
}
}
23 changes: 23 additions & 0 deletions internal/cmd/minipipeline/testdata/analysis.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"DNSExperimentFailure": null,
"DNSTransactionsWithBogons": {},
"DNSTransactionsWithUnexpectedFailures": {},
"DNSPossiblyInvalidAddrs": {},
"HTTPDiffBodyProportionFactor": 1,
"HTTPDiffStatusCodeMatch": true,
"HTTPDiffTitleDifferentLongWords": {},
"HTTPDiffUncommonHeadersIntersection": {
"x-drupal-cache": true,
"x-generator": true
},
"HTTPFinalResponses": {
"4": true
},
"HTTPFinalResponsesWithTLS": {
"4": true
},
"TCPTransactionsWithUnexpectedTCPConnectFailures": {},
"TCPTransactionsWithUnexpectedTLSHandshakeFailures": {},
"TCPTransactionsWithUnexpectedHTTPFailures": {},
"TCPTransactionsWithUnexplainedUnexpectedFailures": {}
}
Loading

0 comments on commit 9920cff

Please sign in to comment.