diff --git a/internal/dslx/dns.go b/internal/dslx/dns.go index 3e9a99d13b..92c4e850d6 100644 --- a/internal/dslx/dns.go +++ b/internal/dslx/dns.go @@ -97,6 +97,9 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] // stop the operation logger ol.Stop(err) + // save the observations + rt.SaveObservations(maybeTraceToObservations(trace)...) + state := &ResolvedAddresses{ Addresses: addrs, // maybe empty Domain: input.Domain, @@ -145,6 +148,9 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA // stop the operation logger ol.Stop(err) + // save the observations + rt.SaveObservations(maybeTraceToObservations(trace)...) + state := &ResolvedAddresses{ Addresses: addrs, // maybe empty Domain: input.Domain, diff --git a/internal/dslx/dns_test.go b/internal/dslx/dns_test.go index 753605408b..49c19fcf68 100644 --- a/internal/dslx/dns_test.go +++ b/internal/dslx/dns_test.go @@ -61,13 +61,12 @@ func TestGetaddrinfo(t *testing.T) { } t.Run("with nil resolver", func(t *testing.T) { - f := DNSLookupGetaddrinfo( - NewMinimalRuntime(model.DiscardLogger, time.Now()), - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now()) + f := DNSLookupGetaddrinfo(rt) ctx, cancel := context.WithCancel(context.Background()) cancel() // immediately cancel the lookup res := f.Apply(ctx, NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { + if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 { t.Fatal("unexpected empty observations") } if res.Error == nil { @@ -77,21 +76,17 @@ func TestGetaddrinfo(t *testing.T) { t.Run("with lookup error", func(t *testing.T) { mockedErr := errors.New("mocked") - f := DNSLookupGetaddrinfo( - NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{ - MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver { - return &mocks.Resolver{ - MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { - return nil, mockedErr - }, - } - }, - })), - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ + MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return nil, mockedErr + }, + } + }, + })) + f := DNSLookupGetaddrinfo(rt) res := f.Apply(context.Background(), NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { - t.Fatal("unexpected empty observations") - } if res.Error != mockedErr { t.Fatalf("unexpected error type: %s", res.Error) } @@ -104,21 +99,17 @@ func TestGetaddrinfo(t *testing.T) { }) t.Run("with success", func(t *testing.T) { - f := DNSLookupGetaddrinfo( - NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ - MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver { - return &mocks.Resolver{ - MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { - return []string{"93.184.216.34"}, nil - }, - } - }, - })), - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ + MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return []string{"93.184.216.34"}, nil + }, + } + }, + })) + f := DNSLookupGetaddrinfo(rt) res := f.Apply(context.Background(), NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { - t.Fatal("unexpected empty observations") - } if res.Error != nil { t.Fatalf("unexpected error: %s", res.Error) } @@ -144,18 +135,19 @@ Test cases: - with success */ func TestLookupUDP(t *testing.T) { - t.Run("Apply dnsLookupGetaddrinfoFunc", func(t *testing.T) { + t.Run("Apply dnsLookupUDPFunc", func(t *testing.T) { domain := &DomainToResolve{ Domain: "example.com", Tags: []string{"antani"}, } t.Run("with nil resolver", func(t *testing.T) { - f := DNSLookupUDP(NewMinimalRuntime(model.DiscardLogger, time.Now()), "1.1.1.1:53") + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now()) + f := DNSLookupUDP(rt, "1.1.1.1:53") ctx, cancel := context.WithCancel(context.Background()) cancel() res := f.Apply(ctx, NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { + if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 { t.Fatal("unexpected empty observations") } if res.Error == nil { @@ -165,29 +157,24 @@ func TestLookupUDP(t *testing.T) { t.Run("with lookup error", func(t *testing.T) { mockedErr := errors.New("mocked") - f := DNSLookupUDP( - NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{ - MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver { - return &mocks.Resolver{ - MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { - return nil, mockedErr - }, - } - }, - MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer { - return &mocks.Dialer{ - MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) { - panic("should not be called") - }, - } - }, - })), - "1.1.1.1:53", - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ + MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return nil, mockedErr + }, + } + }, + MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer { + return &mocks.Dialer{ + MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + panic("should not be called") + }, + } + }, + })) + f := DNSLookupUDP(rt, "1.1.1.1:53") res := f.Apply(context.Background(), NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { - t.Fatal("unexpected empty observations") - } if res.Error != mockedErr { t.Fatalf("unexpected error type: %s", res.Error) } @@ -200,29 +187,24 @@ func TestLookupUDP(t *testing.T) { }) t.Run("with success", func(t *testing.T) { - f := DNSLookupUDP( - NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ - MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver { - return &mocks.Resolver{ - MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { - return []string{"93.184.216.34"}, nil - }, - } - }, - MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer { - return &mocks.Dialer{ - MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) { - panic("should not be called") - }, - } - }, - })), - "1.1.1.1:53", - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{ + MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver { + return &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + return []string{"93.184.216.34"}, nil + }, + } + }, + MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer { + return &mocks.Dialer{ + MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + panic("should not be called") + }, + } + }, + })) + f := DNSLookupUDP(rt, "1.1.1.1:53") res := f.Apply(context.Background(), NewMaybeWithValue(domain)) - if res.Observations == nil || len(res.Observations) <= 0 { - t.Fatal("unexpected empty observations") - } if res.Error != nil { t.Fatalf("unexpected error: %s", res.Error) } diff --git a/internal/dslx/fxcore_test.go b/internal/dslx/fxcore_test.go index 702f839181..182a584120 100644 --- a/internal/dslx/fxcore_test.go +++ b/internal/dslx/fxcore_test.go @@ -9,7 +9,6 @@ import ( "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" - "github.com/ooni/probe-cli/v3/internal/runtimex" ) func getFn(err error, name string) Func[int, int] { @@ -22,7 +21,9 @@ type fn struct { } func (f *fn) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] { - runtimex.Assert(i.Error == nil, "did not expect to see an error here") + if i.Error != nil { + return i + } return &Maybe[int]{ Error: f.err, State: i.State + 1, @@ -112,20 +113,3 @@ func TestGen(t *testing.T) { } }) } - -func TestObservations(t *testing.T) { - t.Run("Extract observations", func(t *testing.T) { - fn1 := getFn(nil, "succeed") - fn2 := getFn(nil, "succeed") - composit := Compose2(fn1, fn2) - r1 := composit.Apply(context.Background(), NewMaybeWithValue(3)) - r2 := composit.Apply(context.Background(), NewMaybeWithValue(42)) - if len(r1.Observations) != 2 || len(r2.Observations) != 2 { - t.Fatalf("unexpected number of observations") - } - mergedObservations := ExtractObservations(r1, r2) - if len(mergedObservations) != 4 { - t.Fatalf("unexpected number of merged observations") - } - }) -} diff --git a/internal/dslx/http_test.go b/internal/dslx/http_test.go index ade2904555..15f7feeb0c 100644 --- a/internal/dslx/http_test.go +++ b/internal/dslx/http_test.go @@ -3,7 +3,6 @@ package dslx import ( "context" "errors" - "fmt" "io" "net/http" "strings" @@ -296,27 +295,20 @@ func TestHTTPRequest(t *testing.T) { // makeSureObservationsContainTags ensures the observations you can extract from // the given HTTPResponse contain the tags we configured when testing - makeSureObservationsContainTags := func(res *Maybe[*HTTPResponse]) error { - // exclude the case where there was an error - if res.Error != nil { - return fmt.Errorf("unexpected error: %w", res.Error) - } - - // obtain the observations - for _, obs := range ExtractObservations(res) { + makeSureObservationsContainTags := func(rt Runtime) error { + obs := rt.Observations() - // check the network events - for _, ev := range obs.NetworkEvents { - if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" { - return errors.New(diff) - } + // check the network events + for _, ev := range obs.NetworkEvents { + if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" { + return errors.New(diff) } + } - // check the HTTP events - for _, ev := range obs.Requests { - if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" { - return errors.New(diff) - } + // check the HTTP events + for _, ev := range obs.Requests { + if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" { + return errors.New(diff) } } @@ -331,9 +323,8 @@ func TestHTTPRequest(t *testing.T) { Trace: trace, Transport: goodTransport, } - httpRequest := HTTPRequest( - NewMinimalRuntime(model.DiscardLogger, time.Now()), - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now()) + httpRequest := HTTPRequest(rt) res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("unexpected error") @@ -341,7 +332,7 @@ func TestHTTPRequest(t *testing.T) { if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" { t.Fatal("unexpected request") } - makeSureObservationsContainTags(res) + makeSureObservationsContainTags(rt) }) t.Run("with success (http)", func(t *testing.T) { @@ -352,9 +343,8 @@ func TestHTTPRequest(t *testing.T) { Trace: trace, Transport: goodTransport, } - httpRequest := HTTPRequest( - NewMinimalRuntime(model.DiscardLogger, time.Now()), - ) + rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now()) + httpRequest := HTTPRequest(rt) res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport)) if res.Error != nil { t.Fatal("unexpected error") @@ -362,7 +352,7 @@ func TestHTTPRequest(t *testing.T) { if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" { t.Fatal("unexpected request") } - makeSureObservationsContainTags(res) + makeSureObservationsContainTags(rt) }) t.Run("with header options", func(t *testing.T) { diff --git a/internal/dslx/httpcore.go b/internal/dslx/httpcore.go index b1d367c13e..21d853c745 100644 --- a/internal/dslx/httpcore.go +++ b/internal/dslx/httpcore.go @@ -136,7 +136,9 @@ func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection, ol.Stop(err) } + // merge and save observations observations = append(observations, maybeTraceToObservations(input.Trace)...) + rt.SaveObservations(observations...) state := &HTTPResponse{ Address: input.Address, diff --git a/internal/dslx/integration_test.go b/internal/dslx/integration_test.go index e8fdbf37bb..81ba611221 100644 --- a/internal/dslx/integration_test.go +++ b/internal/dslx/integration_test.go @@ -50,19 +50,17 @@ func TestMakeSureWeCollectSpeedSamples(t *testing.T) { } // measure the endpoint - result := f0.Apply(context.Background(), NewMaybeWithValue(epnt)) + _ = f0.Apply(context.Background(), NewMaybeWithValue(epnt)) // get observations - observations := ExtractObservations(result) + observations := rt.Observations() // process the network events and check for summary var foundSummary bool - for _, entry := range observations { - for _, ev := range entry.NetworkEvents { - if ev.Operation == throttling.BytesReceivedCumulativeOperation { - t.Log(ev) - foundSummary = true - } + for _, ev := range observations.NetworkEvents { + if ev.Operation == throttling.BytesReceivedCumulativeOperation { + t.Log(ev) + foundSummary = true } } if !foundSummary { diff --git a/internal/dslx/observations.go b/internal/dslx/observations.go index 269e999463..10153f4c23 100644 --- a/internal/dslx/observations.go +++ b/internal/dslx/observations.go @@ -43,14 +43,6 @@ func NewObservations() *Observations { } } -// ExtractObservations extracts observations from a list of [Maybe]. -func ExtractObservations[T any](rs ...*Maybe[T]) (out []*Observations) { - for _, r := range rs { - out = append(out, r.Observations...) - } - return -} - // maybeTraceToObservations returns the observations inside the // trace taking into account the case where trace is nil. func maybeTraceToObservations(trace Trace) (out []*Observations) { diff --git a/internal/dslx/quic.go b/internal/dslx/quic.go index 75616d18d0..e5baedd0ca 100644 --- a/internal/dslx/quic.go +++ b/internal/dslx/quic.go @@ -57,6 +57,9 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q // stop the operation logger ol.Stop(err) + // save the observations + rt.SaveObservations(maybeTraceToObservations(trace)...) + state := &QUICConnection{ Address: input.Address, QUICConn: quicConn, // possibly nil diff --git a/internal/dslx/runtimecore.go b/internal/dslx/runtimecore.go index 4215431e54..ab264c225e 100644 --- a/internal/dslx/runtimecore.go +++ b/internal/dslx/runtimecore.go @@ -30,6 +30,19 @@ type Runtime interface { // that does not collect any [*Observations]. NewTrace(index int64, zeroTime time.Time, tags ...string) Trace + // Observations returns the [*Observations] saved so far and clears our + // internal copy such that the next call to this method only returns + // the [*Observations] saved since the previous call. + // + // You can safely call this method from multiple goroutine contexts. + Observations() *Observations + + // SaveObservations saves [*Observations] inside the [Runtime]. You can + // get the saved [*Observations] by calling Observations. + // + // You can safely call this method from multiple goroutine contexts. + SaveObservations(obs ...*Observations) + // ZeroTime returns the runtime's "zero" time, which is used as the // starting point to generate observation's delta times. ZeroTime() time.Time diff --git a/internal/dslx/runtimeminimal.go b/internal/dslx/runtimeminimal.go index 522505ef24..d0fd4aefd9 100644 --- a/internal/dslx/runtimeminimal.go +++ b/internal/dslx/runtimeminimal.go @@ -29,6 +29,7 @@ func NewMinimalRuntime(logger model.Logger, zeroTime time.Time, options ...Minim logger: logger, mu: sync.Mutex{}, netx: &netxlite.Netx{Underlying: nil}, // implies using the host's network + ob: NewObservations(), v: []io.Closer{}, zeroT: zeroTime, } @@ -46,10 +47,34 @@ type MinimalRuntime struct { logger model.Logger mu sync.Mutex netx model.MeasuringNetwork + ob *Observations v []io.Closer zeroT time.Time } +// Observations implements Runtime. +func (p *MinimalRuntime) Observations() *Observations { + defer p.mu.Unlock() + p.mu.Lock() + o := p.ob + p.ob = NewObservations() + return o +} + +// SaveObservations implements Runtime. +func (p *MinimalRuntime) SaveObservations(obs ...*Observations) { + defer p.mu.Unlock() + p.mu.Lock() + for _, o := range obs { + p.ob.NetworkEvents = append(p.ob.NetworkEvents, o.NetworkEvents...) + p.ob.QUICHandshakes = append(p.ob.QUICHandshakes, o.QUICHandshakes...) + p.ob.Queries = append(p.ob.Queries, o.Queries...) + p.ob.Requests = append(p.ob.Requests, o.Requests...) + p.ob.TCPConnect = append(p.ob.TCPConnect, o.TCPConnect...) + p.ob.TLSHandshakes = append(p.ob.TLSHandshakes, o.TLSHandshakes...) + } +} + // IDGenerator implements Runtime. func (p *MinimalRuntime) IDGenerator() *atomic.Int64 { return p.idg diff --git a/internal/dslx/tcp.go b/internal/dslx/tcp.go index 5a67aae992..fb6628ca16 100644 --- a/internal/dslx/tcp.go +++ b/internal/dslx/tcp.go @@ -43,6 +43,9 @@ func TCPConnect(rt Runtime) Func[*Endpoint, *TCPConnection] { // stop the operation logger ol.Stop(err) + // save the observations + rt.SaveObservations(maybeTraceToObservations(trace)...) + state := &TCPConnection{ Address: input.Address, Conn: conn, // possibly nil diff --git a/internal/dslx/tls.go b/internal/dslx/tls.go index 25590e9297..b3d9bb8150 100644 --- a/internal/dslx/tls.go +++ b/internal/dslx/tls.go @@ -83,6 +83,9 @@ func TLSHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*TCPConnection // stop the operation logger ol.Stop(err) + // save the observations + rt.SaveObservations(maybeTraceToObservations(trace)...) + state := &TLSConnection{ Address: input.Address, Conn: conn, // possibly nil