From 84e1ad6e62aa8df7fbb6357be0b12e6d1691be82 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 19:34:59 +0200 Subject: [PATCH] feat(dslx): introduce MeasureResolvedAddresses (#1388) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This diff introduces MeasureResolvedAddresses and other ancillary functions that allow us to express measuring endpoints in a very simple way and with less code than with previous dslx code. As a result, we can deprecate a bunch of dslx functions (we will actually clean them up at a later time). While there, make StreamList simpler and more efficient: we can stream w/o creating a goroutine. It suffices to create a sufficiently buffered channel and fill it, and we can do all the work in the current goroutine. While there, fix a bug where QUICHandshake was mistakenly using the host network rather than the measuring network to create UDP listening sockets, which, well..., generated lots of confusion for me 😅 🤦. While there, make sure the netemx.MustNewScenario function creates HTTP/3 listeners for DNS servers. Previously, we did not create such listeners, which however is necessary because we very often use DoH3. While there, make sure we have issues for upcoming work. Closes https://github.com/ooni/probe/issues/2618. --- internal/dslx/address.go | 2 + internal/dslx/dns.go | 35 ++++-- internal/dslx/endpoint.go | 41 ++++++++ internal/dslx/fxasync.go | 60 +++++++++++ internal/dslx/fxasync_test.go | 12 +++ internal/dslx/fxcore.go | 75 +++++++++++++ internal/dslx/fxstream.go | 12 +-- internal/dslx/httpcore.go | 4 +- internal/dslx/qa_test.go | 181 +++++++++++++++++++++++++++++++- internal/dslx/quic.go | 3 +- internal/dslx/quic_test.go | 4 + internal/dslx/runtimeminimal.go | 5 + internal/dslx/trace.go | 3 + internal/measurexlite/udp.go | 1 + internal/netemx/scenario.go | 6 ++ 15 files changed, 423 insertions(+), 21 deletions(-) diff --git a/internal/dslx/address.go b/internal/dslx/address.go index c0b3076cdc..7d06ac7148 100644 --- a/internal/dslx/address.go +++ b/internal/dslx/address.go @@ -71,6 +71,8 @@ type EndpointPort uint16 // ToEndpoints transforms this set of IP addresses to a list of endpoints. We will // combine each IP address with the network and the port to construct an endpoint and // we will also apply any additional option to each endpoint. +// +// Deprecated: use MakeEndpoint instead. func (as *AddressSet) ToEndpoints( network EndpointNetwork, port EndpointPort, options ...EndpointOption) (v []*Endpoint) { for addr := range as.M { diff --git a/internal/dslx/dns.go b/internal/dslx/dns.go index 38249a87f9..c6f4172891 100644 --- a/internal/dslx/dns.go +++ b/internal/dslx/dns.go @@ -64,6 +64,26 @@ type ResolvedAddresses struct { Domain string } +// Flatten transforms a [ResolvedAddresses] into a slice of zero or more [ResolvedAddress]. +func (ra *ResolvedAddresses) Flatten() (out []*ResolvedAddress) { + for _, ipAddr := range ra.Addresses { + out = append(out, &ResolvedAddress{ + Address: ipAddr, + Domain: ra.Domain, + }) + } + return +} + +// ResolvedAddress is a single address resolved using a DNS lookup function. +type ResolvedAddress struct { + // Address is the address that was resolved. + Address string + + // Domain is the domain from which we resolved the address. + Domain string +} + // DNSLookupGetaddrinfo returns a function that resolves a domain name to // IP addresses using libc's getaddrinfo function. func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] { @@ -90,18 +110,17 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] // lookup addrs, err := resolver.LookupHost(ctx, input.Domain) - // stop the operation logger - ol.Stop(err) - // save the observations rt.SaveObservations(maybeTraceToObservations(trace)...) // handle error case if err != nil { + ol.Stop(err) return nil, err } // handle success + ol.Stop(addrs) state := &ResolvedAddresses{ Addresses: addrs, Domain: input.Domain, @@ -141,18 +160,17 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA // lookup addrs, err := resolver.LookupHost(ctx, input.Domain) - // stop the operation logger - ol.Stop(err) - // save the observations rt.SaveObservations(maybeTraceToObservations(trace)...) // handle error case if err != nil { + ol.Stop(err) return nil, err } // handle success + ol.Stop(addrs) state := &ResolvedAddresses{ Addresses: addrs, Domain: input.Domain, @@ -170,8 +188,11 @@ var ErrDNSLookupParallel = errors.New("dslx: DNSLookupParallel failed") // processing observations or by creating a per-DNS-resolver pipeline. func DNSLookupParallel(fxs ...Func[*DomainToResolve, *ResolvedAddresses]) Func[*DomainToResolve, *ResolvedAddresses] { return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, domain *DomainToResolve) (*ResolvedAddresses, error) { + // TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this + const parallelism = Parallelism(3) + // run all the DNS resolvers in parallel - results := Parallel(ctx, Parallelism(2), domain, fxs...) + results := Parallel(ctx, parallelism, domain, fxs...) // reduce addresses addressSet := NewAddressSet() diff --git a/internal/dslx/endpoint.go b/internal/dslx/endpoint.go index ea725f5d58..c9f5b0c4a5 100644 --- a/internal/dslx/endpoint.go +++ b/internal/dslx/endpoint.go @@ -4,6 +4,12 @@ package dslx // Manipulate endpoints // +import ( + "context" + "net" + "strconv" +) + type ( // EndpointNetwork is the network of the endpoint EndpointNetwork string @@ -70,3 +76,38 @@ func NewEndpoint( } return epnt } + +// MakeEndpoint returns a [Func] that creates an [*Endpoint] given [*ResolvedAddress]. +func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...EndpointOption) Func[*ResolvedAddress, *Endpoint] { + return Operation[*ResolvedAddress, *Endpoint](func(ctx context.Context, addr *ResolvedAddress) (*Endpoint, error) { + // create the destination endpoint address + addrport := EndpointAddress(net.JoinHostPort(addr.Address, strconv.Itoa(int(port)))) + + // make sure we include the proper domain name first but allow the caller + // to potentially override the domain name using options + allOptions := []EndpointOption{ + EndpointOptionDomain(addr.Domain), + } + allOptions = append(allOptions, options...) + + // build and return the endpoint + endpoint := NewEndpoint(network, addrport, allOptions...) + return endpoint, nil + }) +} + +// MeasureResolvedAddresses returns a [Func] that measures the resolved addresses provided +// as the input argument using each of the provided functions. +func MeasureResolvedAddresses(fxs ...Func[*ResolvedAddress, Void]) Func[*ResolvedAddresses, Void] { + return Operation[*ResolvedAddresses, Void](func(ctx context.Context, addrs *ResolvedAddresses) (Void, error) { + // TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this + const parallelism = Parallelism(3) + + // run the matrix until the output is drained + for range Matrix(ctx, parallelism, addrs.Flatten(), fxs) { + // nothing + } + + return Void{}, nil + }) +} diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index cfb428ec7e..c11c553d64 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -30,6 +30,8 @@ type Parallelism int // The return value is the channel generating fx(a) // for every a in inputs. This channel will also be closed // to signal EOF to the consumer. +// +// Deprecated: use Matrix instead. func Map[A, B any]( ctx context.Context, parallelism Parallelism, @@ -77,6 +79,8 @@ func Map[A, B any]( // - fn is the list of functions. // // The return value is the list [fx(a)] for every fx in fn. +// +// Deprecated: use Matrix instead. func Parallel[A, B any]( ctx context.Context, parallelism Parallelism, @@ -90,6 +94,8 @@ func Parallel[A, B any]( // ParallelAsync is like Parallel but deals with channels. We assume the // input channel will be closed to signal EOF. We will close the output // channel to signal EOF to the consumer. +// +// Deprecated: use Matrix instead. func ParallelAsync[A, B any]( ctx context.Context, parallelism Parallelism, @@ -124,6 +130,8 @@ func ParallelAsync[A, B any]( } // ApplyAsync is equivalent to calling Apply but returns a channel. +// +// Deprecated: use Matrix instead. func ApplyAsync[A, B any]( ctx context.Context, fx Func[A, B], @@ -131,3 +139,55 @@ func ApplyAsync[A, B any]( ) <-chan *Maybe[B] { return Map(ctx, Parallelism(1), fx, StreamList(input)) } + +// matrixPoint is a point within the matrix used by [Matrix]. +type matrixPoint[A, B any] struct { + f Func[A, B] + in A +} + +// matrixMin can be replaced with the built-in min when we switch to go1.21. +func matrixMin(a, b Parallelism) Parallelism { + if a < b { + return a + } + return b +} + +// Matrix invokes each function on each input using N goroutines and streams the results to a channel. +func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions []Func[A, B]) <-chan *Maybe[B] { + // make output + output := make(chan *Maybe[B]) + + // stream all the possible points + points := make(chan *matrixPoint[A, B]) + go func() { + defer close(points) + for _, input := range inputs { + for _, fx := range functions { + points <- &matrixPoint[A, B]{f: fx, in: input} + } + } + }() + + // spawn goroutines + wg := &sync.WaitGroup{} + N = matrixMin(1, N) + for i := Parallelism(0); i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for p := range points { + output <- p.f.Apply(ctx, NewMaybeWithValue(p.in)) + } + }() + } + + // close output channel when done + go func() { + defer close(output) + wg.Wait() + }() + + return output +} diff --git a/internal/dslx/fxasync_test.go b/internal/dslx/fxasync_test.go index 0e98242053..2893d6d3ac 100644 --- a/internal/dslx/fxasync_test.go +++ b/internal/dslx/fxasync_test.go @@ -101,3 +101,15 @@ func TestParallel(t *testing.T) { } }) } + +func TestMatrixMin(t *testing.T) { + if v := matrixMin(1, 7); v != 1 { + t.Fatal("expected to see 1, got", v) + } + if v := matrixMin(7, 4); v != 4 { + t.Fatal("expected to see 4, got", v) + } + if v := matrixMin(11, 11); v != 11 { + t.Fatal("expected to see 11, got", v) + } +} diff --git a/internal/dslx/fxcore.go b/internal/dslx/fxcore.go index 8074a4b18d..4dfcade417 100644 --- a/internal/dslx/fxcore.go +++ b/internal/dslx/fxcore.go @@ -6,6 +6,8 @@ package dslx import ( "context" + "errors" + "sync" ) // Func is a function f: (context.Context, A) -> B. @@ -13,6 +15,14 @@ type Func[A, B any] interface { Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] } +// FuncAdapter adapts a func to be a [Func]. +type FuncAdapter[A, B any] func(ctx context.Context, a *Maybe[A]) *Maybe[B] + +// Apply implements Func. +func (fa FuncAdapter[A, B]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] { + return fa(ctx, a) +} + // Operation adapts a golang function to behave like a Func. type Operation[A, B any] func(ctx context.Context, a A) (B, error) @@ -73,3 +83,68 @@ type compose2Func[A, B, C any] struct { func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[C] { return h.g.Apply(ctx, h.f.Apply(ctx, a)) } + +// Void is the empty data structure. +type Void struct{} + +// Discard transforms any type to [Void]. +func Discard[T any]() Func[T, Void] { + return Operation[T, Void](func(ctx context.Context, input T) (Void, error) { + return Void{}, nil + }) +} + +// ErrSkip is an error that indicates that we already processed an error emitted +// by a previous stage, so we are using this error to avoid counting the original +// error more than once when computing statistics, e.g., in [*Stats]. +var ErrSkip = errors.New("dslx: error already processed by a previous stage") + +// Stats measures the number of successes and failures. +// +// The zero value is invalid; use [NewStats]. +type Stats[T any] struct { + m map[string]int64 + mu sync.Mutex +} + +// NewStats creates a [*Stats] instance. +func NewStats[T any]() *Stats[T] { + return &Stats[T]{ + m: map[string]int64{}, + mu: sync.Mutex{}, + } +} + +// Observer returns a Func that observes the results of the previous pipeline stage. This function +// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip], meaning +// that you will never see [ErrSkip] in the stats returned by [Stats.Export]. +func (s *Stats[T]) Observer() Func[T, T] { + return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] { + defer s.mu.Unlock() + s.mu.Lock() + var r string + if err := minput.Error; err != nil { + if errors.Is(err, ErrSkip) { + return NewMaybeWithError[T](ErrSkip) // as documented + } + r = err.Error() + } + s.m[r]++ + if r != "" { + return NewMaybeWithError[T](ErrSkip) // as documented + } + return minput + }) +} + +// Export exports the current stats without clearing the internally used map such that +// statistics accumulate over time and never reset for the [*Stats] lifecycle. +func (s *Stats[T]) Export() (out map[string]int64) { + out = make(map[string]int64) + defer s.mu.Unlock() + s.mu.Lock() + for r, cnt := range s.m { + out[r] = cnt + } + return +} diff --git a/internal/dslx/fxstream.go b/internal/dslx/fxstream.go index 7b15efe459..21d0672266 100644 --- a/internal/dslx/fxstream.go +++ b/internal/dslx/fxstream.go @@ -18,13 +18,11 @@ func Collect[T any](c <-chan T) (v []T) { // StreamList creates a channel out of static values. This function will // close the channel when it has streamed all the available elements. func StreamList[T any](ts ...T) <-chan T { - c := make(chan T) - go func() { - defer close(c) // as documented - for _, t := range ts { - c <- t - } - }() + c := make(chan T, len(ts)) // buffer so writing does not block + defer close(c) // as documented + for _, t := range ts { + c <- t + } return c } diff --git a/internal/dslx/httpcore.go b/internal/dslx/httpcore.go index 56bd6241e4..22e87363b4 100644 --- a/internal/dslx/httpcore.go +++ b/internal/dslx/httpcore.go @@ -221,7 +221,7 @@ func httpRoundTrip( input *HTTPConnection, req *http.Request, ) (*http.Response, []byte, []*Observations, error) { - const maxbody = 1 << 19 // TODO(bassosimone): allow to configure this value? + const maxbody = 1 << 19 // TODO(https://github.com/ooni/probe/issues/2621): allow to configure this value started := input.Trace.TimeSince(input.Trace.ZeroTime()) // manually create a single 1-length observations structure because @@ -249,7 +249,7 @@ func httpRoundTrip( // read a snapshot of the response body reader := io.LimitReader(resp.Body, maxbody) - body, err = netxlite.ReadAllContext(ctx, reader) // TODO: enable streaming and measure speed + body, err = netxlite.ReadAllContext(ctx, reader) // TODO(https://github.com/ooni/probe/issues/2622) // collect and save download speed samples samples := sampler.ExtractSamples() diff --git a/internal/dslx/qa_test.go b/internal/dslx/qa_test.go index f329482d2b..24fd1c8d04 100644 --- a/internal/dslx/qa_test.go +++ b/internal/dslx/qa_test.go @@ -9,6 +9,7 @@ import ( "github.com/apex/log" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/gopacket/layers" "github.com/ooni/netem" "github.com/ooni/probe-cli/v3/internal/dslx" "github.com/ooni/probe-cli/v3/internal/model" @@ -28,7 +29,7 @@ func TestDNSLookupQA(t *testing.T) { // name is the test case name name string - // newRuntime is the function that creates a new runtime + // newRuntime is the function creating a new runtime newRuntime func(netx model.MeasuringNetwork) dslx.Runtime // configureDPI configures DPI @@ -45,7 +46,7 @@ func TestDNSLookupQA(t *testing.T) { } cases := []testcase{{ - name: "successful case with minimal runtime", + name: "success with minimal runtime", newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) }, @@ -130,7 +131,181 @@ func TestDNSLookupQA(t *testing.T) { t.Fatal(diff) } - // TODO(bassosimone): make sure the observations are OK + // TODO(https://github.com/ooni/probe/issues/2620): make sure the observations are OK + }) + } +} + +func TestMeasureResolvedAddressesQA(t *testing.T) { + // testcase is a test case implemented by this function + type testcase struct { + // name is the test case name + name string + + // newRuntime is the function creating a new runtime + newRuntime func(netx model.MeasuringNetwork) dslx.Runtime + + // configureDPI configures DPI + configureDPI func(dpi *netem.DPIEngine) + + // expectTCP contains the expected TCP connect stats + expectTCP map[string]int64 + + // expectTLS contains the expected TLS handshake stats + expectTLS map[string]int64 + + // expectQUIC contains the expected QUIC handshake stats + expectQUIC map[string]int64 + } + + cases := []testcase{{ + name: "success with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + // nothing + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{"": 2}, + expectQUIC: map[string]int64{"": 2}, + }, { + name: "TCP connection refused with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPICloseConnectionForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.8.8", + ServerPort: 443, + }) + dpi.AddRule(&netem.DPICloseConnectionForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.4.4", + ServerPort: 443, + }) + }, + expectTCP: map[string]int64{ + "connection_refused": 2, + }, + expectTLS: map[string]int64{}, + expectQUIC: map[string]int64{"": 2}, + }, { + name: "TLS handshake reset with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPIResetTrafficForTLSSNI{ + Logger: log.Log, + SNI: "dns.google", + }) + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{ + "connection_reset": 2, + }, + expectQUIC: map[string]int64{"": 2}, + }, { + name: "QUIC handshake timeout with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.8.8", + ServerPort: 443, + ServerProtocol: layers.IPProtocolUDP, + }) + dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.4.4", + ServerPort: 443, + ServerProtocol: layers.IPProtocolUDP, + }) + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{"": 2}, + expectQUIC: map[string]int64{ + "generic_timeout_error": 2, + }, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // create an internet testing scenario + env := netemx.MustNewScenario(netemx.InternetScenario) + defer env.Close() + + // create a dslx.Runtime using the client stack + rt := tc.newRuntime(&netxlite.Netx{ + Underlying: &netxlite.NetemUnderlyingNetworkAdapter{UNet: env.ClientStack}, + }) + defer rt.Close() + + // configure the DPI engine + tc.configureDPI(env.DPIEngine()) + + // create stats + var ( + tcpConnectStats = dslx.NewStats[*dslx.TCPConnection]() + tlsHandshakeStats = dslx.NewStats[*dslx.TLSConnection]() + quicHandshakeStats = dslx.NewStats[*dslx.QUICConnection]() + ) + + // create endpoint measurement function + function := dslx.MeasureResolvedAddresses( + // measure 443/tcp + dslx.Compose7( + dslx.MakeEndpoint("tcp", 443), + dslx.TCPConnect(rt), + tcpConnectStats.Observer(), + dslx.TLSHandshake(rt), + tlsHandshakeStats.Observer(), + dslx.HTTPRequestOverTLS(rt), + dslx.Discard[*dslx.HTTPResponse](), + ), + + // measure 443/udp + dslx.Compose5( + dslx.MakeEndpoint("udp", 443), + dslx.QUICHandshake(rt), + quicHandshakeStats.Observer(), + dslx.HTTPRequestOverQUIC(rt), + dslx.Discard[*dslx.HTTPResponse](), + ), + ) + + // create context + ctx := context.Background() + + // fake out the resolved addresses + resolvedAddrs := &dslx.ResolvedAddresses{ + Addresses: []string{"8.8.8.8", "8.8.4.4"}, + Domain: "dns.google", + } + + // measure the endpoints + _ = function.Apply(ctx, dslx.NewMaybeWithValue(resolvedAddrs)) + + // make sure the TCP connect results are consistent + if diff := cmp.Diff(tc.expectTCP, tcpConnectStats.Export()); diff != "" { + t.Fatal(diff) + } + + // make sure the TLS handshake results are consistent + if diff := cmp.Diff(tc.expectTLS, tlsHandshakeStats.Export()); diff != "" { + t.Fatal(diff) + } + + // make sure the QUIC handshake results are consistent + if diff := cmp.Diff(tc.expectQUIC, quicHandshakeStats.Export()); diff != "" { + t.Fatal(diff) + } + + // TODO(https://github.com/ooni/probe/issues/2620): make sure the observations are OK }) } } diff --git a/internal/dslx/quic.go b/internal/dslx/quic.go index 3ad65241c6..362f052500 100644 --- a/internal/dslx/quic.go +++ b/internal/dslx/quic.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ooni/probe-cli/v3/internal/logx" - "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/quic-go/quic-go" ) @@ -35,7 +34,7 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q ) // setup - udpListener := netxlite.NewUDPListener() + udpListener := trace.NewUDPListener() quicDialer := trace.NewQUICDialerWithoutResolver(udpListener, rt.Logger()) const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/internal/dslx/quic_test.go b/internal/dslx/quic_test.go index d798adc179..8272055a94 100644 --- a/internal/dslx/quic_test.go +++ b/internal/dslx/quic_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/quic-go/quic-go" ) @@ -85,6 +86,9 @@ func TestQUICHandshake(t *testing.T) { MockNewQUICDialerWithoutResolver: func(listener model.UDPListener, logger model.DebugLogger, w ...model.QUICDialerWrapper) model.QUICDialer { return tt.dialer }, + MockNewUDPListener: func() model.UDPListener { + return netxlite.NewUDPListener() + }, })) quicHandshake := QUICHandshake(rt, TLSHandshakeOptionServerName(tt.sni)) endpoint := &Endpoint{ diff --git a/internal/dslx/runtimeminimal.go b/internal/dslx/runtimeminimal.go index d0fd4aefd9..1e73276937 100644 --- a/internal/dslx/runtimeminimal.go +++ b/internal/dslx/runtimeminimal.go @@ -170,6 +170,11 @@ func (tx *minimalTrace) NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHa return tx.netx.NewTLSHandshakerStdlib(dl) } +// NewUDPListener implements Trace +func (tx *minimalTrace) NewUDPListener() model.UDPListener { + return tx.netx.NewUDPListener() +} + // QUICHandshakes implements Trace. func (tx *minimalTrace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) { return []*model.ArchivalTLSOrQUICHandshakeResult{} diff --git a/internal/dslx/trace.go b/internal/dslx/trace.go index 09094712ac..8cc9f4030f 100644 --- a/internal/dslx/trace.go +++ b/internal/dslx/trace.go @@ -51,6 +51,9 @@ type Trace interface { // NewStdlibResolver returns a possibly-trace-ware system resolver. NewStdlibResolver(logger model.DebugLogger) model.Resolver + // NewUDPListener implements model.MeasuringNetwork. + NewUDPListener() model.UDPListener + // QUICHandshakes collects all the QUIC handshake results collected so far. QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) diff --git a/internal/measurexlite/udp.go b/internal/measurexlite/udp.go index ee9d1aaf05..72f2cb2fcf 100644 --- a/internal/measurexlite/udp.go +++ b/internal/measurexlite/udp.go @@ -2,6 +2,7 @@ package measurexlite import "github.com/ooni/probe-cli/v3/internal/model" +// NewUDPListener implements model.Measuring Network. func (tx *Trace) NewUDPListener() model.UDPListener { return tx.Netx.NewUDPListener() } diff --git a/internal/netemx/scenario.go b/internal/netemx/scenario.go index 1d9f6c7ced..b5b5a0f7cf 100644 --- a/internal/netemx/scenario.go +++ b/internal/netemx/scenario.go @@ -209,6 +209,12 @@ func MustNewScenario(config []*ScenarioDomainAddresses) *QAEnv { ServerNameMain: sad.ServerNameMain, ServerNameExtras: sad.ServerNameExtras, }, + &HTTP3ServerFactory{ + Factory: &DNSOverHTTPSHandlerFactory{}, + Ports: []int{443}, + ServerNameMain: sad.ServerNameMain, + ServerNameExtras: sad.ServerNameExtras, + }, )) }