Skip to content

Commit

Permalink
refactor(dslx): collect observations using runtime (#1383)
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone authored Oct 25, 2023
1 parent e701ddb commit 966a45d
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 141 deletions.
6 changes: 6 additions & 0 deletions internal/dslx/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses]
// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

state := &ResolvedAddresses{
Addresses: addrs, // maybe empty
Domain: input.Domain,
Expand Down Expand Up @@ -145,6 +148,9 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA
// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

state := &ResolvedAddresses{
Addresses: addrs, // maybe empty
Domain: input.Domain,
Expand Down
140 changes: 61 additions & 79 deletions internal/dslx/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ func TestGetaddrinfo(t *testing.T) {
}

t.Run("with nil resolver", func(t *testing.T) {
f := DNSLookupGetaddrinfo(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
f := DNSLookupGetaddrinfo(rt)
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately cancel the lookup
res := f.Apply(ctx, NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error == nil {
Expand All @@ -77,21 +76,17 @@ func TestGetaddrinfo(t *testing.T) {

t.Run("with lookup error", func(t *testing.T) {
mockedErr := errors.New("mocked")
f := DNSLookupGetaddrinfo(
NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
})),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
}))
f := DNSLookupGetaddrinfo(rt)
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != mockedErr {
t.Fatalf("unexpected error type: %s", res.Error)
}
Expand All @@ -104,21 +99,17 @@ func TestGetaddrinfo(t *testing.T) {
})

t.Run("with success", func(t *testing.T) {
f := DNSLookupGetaddrinfo(
NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
})),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
}))
f := DNSLookupGetaddrinfo(rt)
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != nil {
t.Fatalf("unexpected error: %s", res.Error)
}
Expand All @@ -144,18 +135,19 @@ Test cases:
- with success
*/
func TestLookupUDP(t *testing.T) {
t.Run("Apply dnsLookupGetaddrinfoFunc", func(t *testing.T) {
t.Run("Apply dnsLookupUDPFunc", func(t *testing.T) {
domain := &DomainToResolve{
Domain: "example.com",
Tags: []string{"antani"},
}

t.Run("with nil resolver", func(t *testing.T) {
f := DNSLookupUDP(NewMinimalRuntime(model.DiscardLogger, time.Now()), "1.1.1.1:53")
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
f := DNSLookupUDP(rt, "1.1.1.1:53")
ctx, cancel := context.WithCancel(context.Background())
cancel()
res := f.Apply(ctx, NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error == nil {
Expand All @@ -165,29 +157,24 @@ func TestLookupUDP(t *testing.T) {

t.Run("with lookup error", func(t *testing.T) {
mockedErr := errors.New("mocked")
f := DNSLookupUDP(
NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
})),
"1.1.1.1:53",
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
}))
f := DNSLookupUDP(rt, "1.1.1.1:53")
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != mockedErr {
t.Fatalf("unexpected error type: %s", res.Error)
}
Expand All @@ -200,29 +187,24 @@ func TestLookupUDP(t *testing.T) {
})

t.Run("with success", func(t *testing.T) {
f := DNSLookupUDP(
NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
})),
"1.1.1.1:53",
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
}))
f := DNSLookupUDP(rt, "1.1.1.1:53")
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != nil {
t.Fatalf("unexpected error: %s", res.Error)
}
Expand Down
22 changes: 3 additions & 19 deletions internal/dslx/fxcore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

func getFn(err error, name string) Func[int, int] {
Expand All @@ -22,7 +21,9 @@ type fn struct {
}

func (f *fn) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] {
runtimex.Assert(i.Error == nil, "did not expect to see an error here")
if i.Error != nil {
return i
}
return &Maybe[int]{
Error: f.err,
State: i.State + 1,
Expand Down Expand Up @@ -112,20 +113,3 @@ func TestGen(t *testing.T) {
}
})
}

func TestObservations(t *testing.T) {
t.Run("Extract observations", func(t *testing.T) {
fn1 := getFn(nil, "succeed")
fn2 := getFn(nil, "succeed")
composit := Compose2(fn1, fn2)
r1 := composit.Apply(context.Background(), NewMaybeWithValue(3))
r2 := composit.Apply(context.Background(), NewMaybeWithValue(42))
if len(r1.Observations) != 2 || len(r2.Observations) != 2 {
t.Fatalf("unexpected number of observations")
}
mergedObservations := ExtractObservations(r1, r2)
if len(mergedObservations) != 4 {
t.Fatalf("unexpected number of merged observations")
}
})
}
44 changes: 17 additions & 27 deletions internal/dslx/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dslx
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -296,27 +295,20 @@ func TestHTTPRequest(t *testing.T) {

// makeSureObservationsContainTags ensures the observations you can extract from
// the given HTTPResponse contain the tags we configured when testing
makeSureObservationsContainTags := func(res *Maybe[*HTTPResponse]) error {
// exclude the case where there was an error
if res.Error != nil {
return fmt.Errorf("unexpected error: %w", res.Error)
}

// obtain the observations
for _, obs := range ExtractObservations(res) {
makeSureObservationsContainTags := func(rt Runtime) error {
obs := rt.Observations()

// check the network events
for _, ev := range obs.NetworkEvents {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
// check the network events
for _, ev := range obs.NetworkEvents {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}

// check the HTTP events
for _, ev := range obs.Requests {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
// check the HTTP events
for _, ev := range obs.Requests {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}

Expand All @@ -331,17 +323,16 @@ func TestHTTPRequest(t *testing.T) {
Trace: trace,
Transport: goodTransport,
}
httpRequest := HTTPRequest(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
httpRequest := HTTPRequest(rt)
res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport))
if res.Error != nil {
t.Fatal("unexpected error")
}
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
makeSureObservationsContainTags(rt)
})

t.Run("with success (http)", func(t *testing.T) {
Expand All @@ -352,17 +343,16 @@ func TestHTTPRequest(t *testing.T) {
Trace: trace,
Transport: goodTransport,
}
httpRequest := HTTPRequest(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
httpRequest := HTTPRequest(rt)
res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport))
if res.Error != nil {
t.Fatal("unexpected error")
}
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
makeSureObservationsContainTags(rt)
})

t.Run("with header options", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection,
ol.Stop(err)
}

// merge and save observations
observations = append(observations, maybeTraceToObservations(input.Trace)...)
rt.SaveObservations(observations...)

state := &HTTPResponse{
Address: input.Address,
Expand Down
14 changes: 6 additions & 8 deletions internal/dslx/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,17 @@ func TestMakeSureWeCollectSpeedSamples(t *testing.T) {
}

// measure the endpoint
result := f0.Apply(context.Background(), NewMaybeWithValue(epnt))
_ = f0.Apply(context.Background(), NewMaybeWithValue(epnt))

// get observations
observations := ExtractObservations(result)
observations := rt.Observations()

// process the network events and check for summary
var foundSummary bool
for _, entry := range observations {
for _, ev := range entry.NetworkEvents {
if ev.Operation == throttling.BytesReceivedCumulativeOperation {
t.Log(ev)
foundSummary = true
}
for _, ev := range observations.NetworkEvents {
if ev.Operation == throttling.BytesReceivedCumulativeOperation {
t.Log(ev)
foundSummary = true
}
}
if !foundSummary {
Expand Down
8 changes: 0 additions & 8 deletions internal/dslx/observations.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func NewObservations() *Observations {
}
}

// ExtractObservations extracts observations from a list of [Maybe].
func ExtractObservations[T any](rs ...*Maybe[T]) (out []*Observations) {
for _, r := range rs {
out = append(out, r.Observations...)
}
return
}

// maybeTraceToObservations returns the observations inside the
// trace taking into account the case where trace is nil.
func maybeTraceToObservations(trace Trace) (out []*Observations) {
Expand Down
Loading

0 comments on commit 966a45d

Please sign in to comment.