Skip to content

Commit

Permalink
poc: rewrite urlgetter using step-by-step
Browse files Browse the repository at this point in the history
This diff shows how we could incrementally rewrite urlgetter
using a step-by-step measurement style.

Additionally, this diff modifies the facebook_messanger experiment
to show what changes are required to upgrade it.

The general idea of these changes is to incrementally move
experiments away from depending on ./internal/experiment/urlgetter,
and instead use a near drop-in replacement implementation, implemented
in ./internal/urlgetter, which uses step-by-step to measure.

Because ./internal/experiment/urlgetter depends on
./internal/legacy/netx and, instead, ./internal/urlgetter
depends on ./internal/measurexlite, by performing this
kind of migration we make ./internal/legacy/netx unnecessary.

Also, because most users of ./internal/experiment/urlgetter only
use limited functionality, incremental refactoring would be possible.

Closes ooni/probe#2751 because we have
investigated the matter, understood that it is possible, and produced
an initial diff for further design and code review.
  • Loading branch information
bassosimone committed Jun 18, 2024
1 parent 5be3a9a commit 01649b1
Show file tree
Hide file tree
Showing 16 changed files with 1,397 additions and 39 deletions.
64 changes: 42 additions & 22 deletions internal/experiment/fbmessenger/fbmessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package fbmessenger

import (
"context"
"log"
"math/rand"
"time"

"github.com/ooni/probe-cli/v3/internal/experiment/urlgetter"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/urlgetter"
)

const (
Expand Down Expand Up @@ -78,15 +79,22 @@ type Analysis struct {
}

// Update updates the TestKeys using the given MultiOutput result.
func (tk *TestKeys) Update(v urlgetter.MultiOutput) {
func (tk *TestKeys) Update(v *urlgetter.MultiResult) {
// handle the case where there are no test keys
if v.TestKeys.Err != nil {
log.Printf("ELLIOT: %s", v.TestKeys.Err)
return
}

// Update the easy to update entries first
tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.NetworkEvents...)
tk.Queries = append(tk.Queries, v.TestKeys.Queries...)
tk.Requests = append(tk.Requests, v.TestKeys.Requests...)
tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.TCPConnect...)
tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.TLSHandshakes...)
tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.Value.NetworkEvents...)
tk.Queries = append(tk.Queries, v.TestKeys.Value.Queries...)
tk.Requests = append(tk.Requests, v.TestKeys.Value.Requests...)
tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.Value.TCPConnect...)
tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.Value.TLSHandshakes...)

// Set the status of endpoints
switch v.Input.Target {
switch v.Target.URL {
case ServiceSTUN:
var ignored *bool
tk.ComputeEndpointStatus(v, &tk.FacebookSTUNDNSConsistent, &ignored)
Expand Down Expand Up @@ -117,16 +125,23 @@ var (
)

// ComputeEndpointStatus computes the DNS and TCP status of a specific endpoint.
func (tk *TestKeys) ComputeEndpointStatus(v urlgetter.MultiOutput, dns, tcp **bool) {
func (tk *TestKeys) ComputeEndpointStatus(v *urlgetter.MultiResult, dns, tcp **bool) {
// start where all is unknown
*dns, *tcp = nil, nil

// handle the case where there are no test keys
if v.TestKeys.Err != nil {
log.Printf("ELLIOT: %s", v.TestKeys.Err)
return
}

// process DNS first
if v.TestKeys.FailedOperation != nil && *v.TestKeys.FailedOperation == netxlite.ResolveOperation {
if v.TestKeys.Value.FailedOperation.UnwrapOr("") == netxlite.ResolveOperation {
tk.FacebookDNSBlocking = &trueValue
*dns = &falseValue
return // we know that the DNS has failed
}
for _, query := range v.TestKeys.Queries {
for _, query := range v.TestKeys.Value.Queries {
for _, ans := range query.Answers {
if ans.ASN != FacebookASN {
tk.FacebookDNSBlocking = &trueValue
Expand All @@ -137,7 +152,7 @@ func (tk *TestKeys) ComputeEndpointStatus(v urlgetter.MultiOutput, dns, tcp **bo
}
*dns = &trueValue
// now process connect
if v.TestKeys.FailedOperation != nil && *v.TestKeys.FailedOperation == netxlite.ConnectOperation {
if v.TestKeys.Value.FailedOperation.UnwrapOr("") == netxlite.ConnectOperation {
tk.FacebookTCPBlocking = &trueValue
*tcp = &falseValue
return // because connect failed
Expand All @@ -151,9 +166,6 @@ type Measurer struct {
// Config contains the experiment settings. If empty we
// will be using default settings.
Config Config

// Getter is an optional getter to be used for testing.
Getter urlgetter.MultiGetter
}

// ExperimentName implements ExperimentMeasurer.ExperimentName
Expand All @@ -173,24 +185,32 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
sess := args.Session
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
urlgetter.RegisterExtensions(measurement)
//urlgetter.RegisterExtensions(measurement) // TODO(bassosimone)

// generate targets
var inputs []urlgetter.MultiInput
var inputs []*urlgetter.EasyTarget
for _, service := range Services {
inputs = append(inputs, urlgetter.MultiInput{Target: service})
inputs = append(inputs, &urlgetter.EasyTarget{URL: service})
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404 -- not really important
rnd.Shuffle(len(inputs), func(i, j int) {
rand.Shuffle(len(inputs), func(i, j int) {
inputs[i], inputs[j] = inputs[j], inputs[i]
})

// measure in parallel
multi := urlgetter.Multi{Begin: time.Now(), Getter: m.Getter, Session: sess}
multi := &urlgetter.MultiHandle{
Begin: time.Now(),
IndexGen: &urlgetter.IndexGen{},
Session: sess,
}
testkeys := new(TestKeys)
testkeys.Agent = "redirect"
measurement.TestKeys = testkeys
for entry := range multi.Collect(ctx, inputs, "facebook_messenger", callbacks) {
results := urlgetter.MultiCollect(callbacks, 0, len(inputs),
"facebook_messenger", multi.Run(ctx, inputs...))
for entry := range results {
testkeys.Update(entry)
}

// if we haven't yet determined the status of DNS blocking and TCP blocking
// then no blocking has been detected and we can set them
if testkeys.FacebookDNSBlocking == nil {
Expand Down
6 changes: 2 additions & 4 deletions internal/experiment/fbmessenger/fbmessenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@ package fbmessenger_test

import (
"context"
"io"
"net/url"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/gopacket/layers"
"github.com/ooni/netem"
"github.com/ooni/probe-cli/v3/internal/experiment/fbmessenger"
"github.com/ooni/probe-cli/v3/internal/experiment/urlgetter"
"github.com/ooni/probe-cli/v3/internal/legacy/tracex"
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netemx"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

Expand Down Expand Up @@ -326,6 +322,7 @@ func TestMeasurerRun(t *testing.T) {
})
}

/*
func TestComputeEndpointStatsTCPBlocking(t *testing.T) {
failure := io.EOF.Error()
operation := netxlite.ConnectOperation
Expand Down Expand Up @@ -385,6 +382,7 @@ func TestComputeEndpointStatsDNSIsLying(t *testing.T) {
t.Fatal("invalid FacebookTCPBlocking")
}
}
*/

func TestSummaryKeysWithNils(t *testing.T) {
measurement := &model.Measurement{TestKeys: &fbmessenger.TestKeys{}}
Expand Down
22 changes: 13 additions & 9 deletions internal/measurexlite/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ package measurexlite

import (
"fmt"
"io"
"net"
"time"

"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)

// MaybeClose is a convenience function for closing a [net.Conn] when it is not nil.
func MaybeClose(conn net.Conn) (err error) {
// MaybeClose is a convenience function for closing a [io.Closer] when it is not nil.
func MaybeClose(conn io.Closer) (err error) {
if conn != nil {
err = conn.Close()
}
Expand All @@ -39,18 +40,21 @@ type connTrace struct {

var _ net.Conn = &connTrace{}

type remoteAddrProvider interface {
// RemoteAddrProvider is something returning the remote address.
type RemoteAddrProvider interface {
RemoteAddr() net.Addr
}

func safeRemoteAddrNetwork(rap remoteAddrProvider) (result string) {
// SafeRemoteAddrNetwork is a safe accessor to get the remote addr network.
func SafeRemoteAddrNetwork(rap RemoteAddrProvider) (result string) {
if addr := rap.RemoteAddr(); addr != nil {
result = addr.Network()
}
return result
}

func safeRemoteAddrString(rap remoteAddrProvider) (result string) {
// SafeRemoteAddrString is a safe accessor to get the remote addr string representation.
func SafeRemoteAddrString(rap RemoteAddrProvider) (result string) {
if addr := rap.RemoteAddr(); addr != nil {
result = addr.String()
}
Expand All @@ -60,8 +64,8 @@ func safeRemoteAddrString(rap remoteAddrProvider) (result string) {
// Read implements net.Conn.Read and saves network events.
func (c *connTrace) Read(b []byte) (int, error) {
// collect preliminary stats when the connection is surely active
network := safeRemoteAddrNetwork(c)
addr := safeRemoteAddrString(c)
network := SafeRemoteAddrNetwork(c)
addr := SafeRemoteAddrString(c)
started := c.tx.TimeSince(c.tx.ZeroTime())

// perform the underlying network operation
Expand Down Expand Up @@ -117,8 +121,8 @@ func (tx *Trace) CloneBytesReceivedMap() (out map[string]int64) {

// Write implements net.Conn.Write and saves network events.
func (c *connTrace) Write(b []byte) (int, error) {
network := safeRemoteAddrNetwork(c)
addr := safeRemoteAddrString(c)
network := SafeRemoteAddrNetwork(c)
addr := SafeRemoteAddrString(c)
started := c.tx.TimeSince(c.tx.ZeroTime())

count, err := c.Conn.Write(b)
Expand Down
8 changes: 4 additions & 4 deletions internal/measurexlite/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestRemoteAddrProvider(t *testing.T) {
return nil
},
}
if safeRemoteAddrNetwork(conn) != "" {
if SafeRemoteAddrNetwork(conn) != "" {
t.Fatal("expected empty network")
}
if safeRemoteAddrString(conn) != "" {
if SafeRemoteAddrString(conn) != "" {
t.Fatal("expected empty string")
}
})
Expand All @@ -40,10 +40,10 @@ func TestRemoteAddrProvider(t *testing.T) {
}
},
}
if safeRemoteAddrNetwork(conn) != "tcp" {
if SafeRemoteAddrNetwork(conn) != "tcp" {
t.Fatal("unexpected network")
}
if safeRemoteAddrString(conn) != "1.1.1.1:443" {
if SafeRemoteAddrString(conn) != "1.1.1.1:443" {
t.Fatal("unexpected string")
}
})
Expand Down
34 changes: 34 additions & 0 deletions internal/urlgetter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package urlgetter

// Config contains the configuration.
type Config struct {
// HTTPHost allows overriding the default HTTP host.
HTTPHost string `ooni:"Force using specific HTTP Host header"`

// HTTPReferer sets the HTTP referer value.
HTTPReferer string `ooni:"Force using the specific HTTP Referer header"`

// Method selects the HTTP method to use.
Method string `ooni:"Force HTTP method different than GET"`

// NoFollowRedirects disables following redirects.
NoFollowRedirects bool `ooni:"Disable following redirects"`

// TLSNextProtos is an OPTIONAL comma separated ALPN list.
TLSNextProtos string `ooni:"Comma-separated list of next protocols for ALPN"`

// TLSServerName is the OPTIONAL SNI value.
TLSServerName string `ooni:"SNI value to use"`
}

// Clone returns a deep copy of the given [*Config].
func (cx *Config) Clone() *Config {
return &Config{
HTTPHost: cx.HTTPHost,
HTTPReferer: cx.HTTPReferer,
Method: cx.Method,
NoFollowRedirects: cx.NoFollowRedirects,
TLSNextProtos: cx.TLSNextProtos,
TLSServerName: cx.TLSServerName,
}
}
Loading

0 comments on commit 01649b1

Please sign in to comment.