From 0f1887406e2903697abaadca0300ad2d64487fac Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Tue, 18 Jun 2024 14:12:23 +0200 Subject: [PATCH 1/2] poc: rewrite urlgetter using step-by-step This diff shows how we could incrementally rewrite urlgetter using a step-by-step measurement style. Additionally, this diff modifies the facebook_messanger experiment to show what changes are required to upgrade it. The general idea of these changes is to incrementally move experiments away from depending on ./internal/experiment/urlgetter, and instead use a near drop-in replacement implementation, implemented in ./internal/urlgetter, which uses step-by-step to measure. Because ./internal/experiment/urlgetter depends on ./internal/legacy/netx and, instead, ./internal/urlgetter depends on ./internal/measurexlite, by performing this kind of migration we make ./internal/legacy/netx unnecessary. Also, because most users of ./internal/experiment/urlgetter only use limited functionality, incremental refactoring would be possible. Reference issue: https://github.com/ooni/probe/issues/2751. --- .../experiment/fbmessenger/fbmessenger.go | 61 ++-- .../fbmessenger/fbmessenger_test.go | 6 +- internal/measurexlite/conn.go | 22 +- internal/measurexlite/conn_test.go | 8 +- internal/urlgetter/config.go | 34 ++ internal/urlgetter/dnslookup.go | 110 ++++++ internal/urlgetter/doc.go | 2 + internal/urlgetter/easyhandle.go | 92 +++++ internal/urlgetter/http.go | 315 ++++++++++++++++++ internal/urlgetter/httpredirect.go | 41 +++ internal/urlgetter/indexgen.go | 17 + internal/urlgetter/multihandle.go | 183 ++++++++++ internal/urlgetter/runner.go | 97 ++++++ internal/urlgetter/tcpconnect.go | 173 ++++++++++ internal/urlgetter/testkeys.go | 85 +++++ internal/urlgetter/tlshandshake.go | 187 +++++++++++ 16 files changed, 1394 insertions(+), 39 deletions(-) create mode 100644 internal/urlgetter/config.go create mode 100644 internal/urlgetter/dnslookup.go create mode 100644 internal/urlgetter/doc.go create mode 100644 internal/urlgetter/easyhandle.go create mode 100644 internal/urlgetter/http.go create mode 100644 internal/urlgetter/httpredirect.go create mode 100644 internal/urlgetter/indexgen.go create mode 100644 internal/urlgetter/multihandle.go create mode 100644 internal/urlgetter/runner.go create mode 100644 internal/urlgetter/tcpconnect.go create mode 100644 internal/urlgetter/testkeys.go create mode 100644 internal/urlgetter/tlshandshake.go diff --git a/internal/experiment/fbmessenger/fbmessenger.go b/internal/experiment/fbmessenger/fbmessenger.go index 5b18073bac..7145a16fc9 100644 --- a/internal/experiment/fbmessenger/fbmessenger.go +++ b/internal/experiment/fbmessenger/fbmessenger.go @@ -8,9 +8,9 @@ import ( "math/rand" "time" - "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/urlgetter" ) const ( @@ -78,15 +78,21 @@ type Analysis struct { } // Update updates the TestKeys using the given MultiOutput result. -func (tk *TestKeys) Update(v urlgetter.MultiOutput) { +func (tk *TestKeys) Update(v *urlgetter.MultiResult) { + // handle the case where there are no test keys + if v.TestKeys.Err != nil { + return + } + // Update the easy to update entries first - tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.NetworkEvents...) - tk.Queries = append(tk.Queries, v.TestKeys.Queries...) - tk.Requests = append(tk.Requests, v.TestKeys.Requests...) - tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.TCPConnect...) - tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.TLSHandshakes...) + tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.Value.NetworkEvents...) + tk.Queries = append(tk.Queries, v.TestKeys.Value.Queries...) + tk.Requests = append(tk.Requests, v.TestKeys.Value.Requests...) + tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.Value.TCPConnect...) + tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.Value.TLSHandshakes...) + // Set the status of endpoints - switch v.Input.Target { + switch v.Target.URL { case ServiceSTUN: var ignored *bool tk.ComputeEndpointStatus(v, &tk.FacebookSTUNDNSConsistent, &ignored) @@ -117,16 +123,22 @@ var ( ) // ComputeEndpointStatus computes the DNS and TCP status of a specific endpoint. -func (tk *TestKeys) ComputeEndpointStatus(v urlgetter.MultiOutput, dns, tcp **bool) { +func (tk *TestKeys) ComputeEndpointStatus(v *urlgetter.MultiResult, dns, tcp **bool) { // start where all is unknown *dns, *tcp = nil, nil + + // handle the case where there are no test keys + if v.TestKeys.Err != nil { + return + } + // process DNS first - if v.TestKeys.FailedOperation != nil && *v.TestKeys.FailedOperation == netxlite.ResolveOperation { + if v.TestKeys.Value.FailedOperation.UnwrapOr("") == netxlite.ResolveOperation { tk.FacebookDNSBlocking = &trueValue *dns = &falseValue return // we know that the DNS has failed } - for _, query := range v.TestKeys.Queries { + for _, query := range v.TestKeys.Value.Queries { for _, ans := range query.Answers { if ans.ASN != FacebookASN { tk.FacebookDNSBlocking = &trueValue @@ -137,7 +149,7 @@ func (tk *TestKeys) ComputeEndpointStatus(v urlgetter.MultiOutput, dns, tcp **bo } *dns = &trueValue // now process connect - if v.TestKeys.FailedOperation != nil && *v.TestKeys.FailedOperation == netxlite.ConnectOperation { + if v.TestKeys.Value.FailedOperation.UnwrapOr("") == netxlite.ConnectOperation { tk.FacebookTCPBlocking = &trueValue *tcp = &falseValue return // because connect failed @@ -151,9 +163,6 @@ type Measurer struct { // Config contains the experiment settings. If empty we // will be using default settings. Config Config - - // Getter is an optional getter to be used for testing. - Getter urlgetter.MultiGetter } // ExperimentName implements ExperimentMeasurer.ExperimentName @@ -173,24 +182,32 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { sess := args.Session ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - urlgetter.RegisterExtensions(measurement) + //urlgetter.RegisterExtensions(measurement) // TODO(bassosimone) + // generate targets - var inputs []urlgetter.MultiInput + var inputs []*urlgetter.EasyTarget for _, service := range Services { - inputs = append(inputs, urlgetter.MultiInput{Target: service}) + inputs = append(inputs, &urlgetter.EasyTarget{URL: service}) } - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404 -- not really important - rnd.Shuffle(len(inputs), func(i, j int) { + rand.Shuffle(len(inputs), func(i, j int) { inputs[i], inputs[j] = inputs[j], inputs[i] }) + // measure in parallel - multi := urlgetter.Multi{Begin: time.Now(), Getter: m.Getter, Session: sess} + multi := &urlgetter.MultiHandle{ + Begin: time.Now(), + IndexGen: &urlgetter.IndexGen{}, + Session: sess, + } testkeys := new(TestKeys) testkeys.Agent = "redirect" measurement.TestKeys = testkeys - for entry := range multi.Collect(ctx, inputs, "facebook_messenger", callbacks) { + results := urlgetter.MultiCollect(callbacks, 0, len(inputs), + "facebook_messenger", multi.Run(ctx, inputs...)) + for entry := range results { testkeys.Update(entry) } + // if we haven't yet determined the status of DNS blocking and TCP blocking // then no blocking has been detected and we can set them if testkeys.FacebookDNSBlocking == nil { diff --git a/internal/experiment/fbmessenger/fbmessenger_test.go b/internal/experiment/fbmessenger/fbmessenger_test.go index f5f21efdb0..d88ce8eeef 100644 --- a/internal/experiment/fbmessenger/fbmessenger_test.go +++ b/internal/experiment/fbmessenger/fbmessenger_test.go @@ -2,7 +2,6 @@ package fbmessenger_test import ( "context" - "io" "net/url" "testing" @@ -10,12 +9,9 @@ import ( "github.com/google/gopacket/layers" "github.com/ooni/netem" "github.com/ooni/probe-cli/v3/internal/experiment/fbmessenger" - "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" - "github.com/ooni/probe-cli/v3/internal/legacy/tracex" "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netemx" - "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/ooni/probe-cli/v3/internal/runtimex" ) @@ -326,6 +322,7 @@ func TestMeasurerRun(t *testing.T) { }) } +/* func TestComputeEndpointStatsTCPBlocking(t *testing.T) { failure := io.EOF.Error() operation := netxlite.ConnectOperation @@ -385,6 +382,7 @@ func TestComputeEndpointStatsDNSIsLying(t *testing.T) { t.Fatal("invalid FacebookTCPBlocking") } } +*/ func TestSummaryKeysWithNils(t *testing.T) { measurement := &model.Measurement{TestKeys: &fbmessenger.TestKeys{}} diff --git a/internal/measurexlite/conn.go b/internal/measurexlite/conn.go index ae107e88b8..1ec435dbf2 100644 --- a/internal/measurexlite/conn.go +++ b/internal/measurexlite/conn.go @@ -6,6 +6,7 @@ package measurexlite import ( "fmt" + "io" "net" "time" @@ -13,8 +14,8 @@ import ( "github.com/ooni/probe-cli/v3/internal/netxlite" ) -// MaybeClose is a convenience function for closing a [net.Conn] when it is not nil. -func MaybeClose(conn net.Conn) (err error) { +// MaybeClose is a convenience function for closing a [io.Closer] when it is not nil. +func MaybeClose(conn io.Closer) (err error) { if conn != nil { err = conn.Close() } @@ -39,18 +40,21 @@ type connTrace struct { var _ net.Conn = &connTrace{} -type remoteAddrProvider interface { +// RemoteAddrProvider is something returning the remote address. +type RemoteAddrProvider interface { RemoteAddr() net.Addr } -func safeRemoteAddrNetwork(rap remoteAddrProvider) (result string) { +// SafeRemoteAddrNetwork is a safe accessor to get the remote addr network. +func SafeRemoteAddrNetwork(rap RemoteAddrProvider) (result string) { if addr := rap.RemoteAddr(); addr != nil { result = addr.Network() } return result } -func safeRemoteAddrString(rap remoteAddrProvider) (result string) { +// SafeRemoteAddrString is a safe accessor to get the remote addr string representation. +func SafeRemoteAddrString(rap RemoteAddrProvider) (result string) { if addr := rap.RemoteAddr(); addr != nil { result = addr.String() } @@ -60,8 +64,8 @@ func safeRemoteAddrString(rap remoteAddrProvider) (result string) { // Read implements net.Conn.Read and saves network events. func (c *connTrace) Read(b []byte) (int, error) { // collect preliminary stats when the connection is surely active - network := safeRemoteAddrNetwork(c) - addr := safeRemoteAddrString(c) + network := SafeRemoteAddrNetwork(c) + addr := SafeRemoteAddrString(c) started := c.tx.TimeSince(c.tx.ZeroTime()) // perform the underlying network operation @@ -117,8 +121,8 @@ func (tx *Trace) CloneBytesReceivedMap() (out map[string]int64) { // Write implements net.Conn.Write and saves network events. func (c *connTrace) Write(b []byte) (int, error) { - network := safeRemoteAddrNetwork(c) - addr := safeRemoteAddrString(c) + network := SafeRemoteAddrNetwork(c) + addr := SafeRemoteAddrString(c) started := c.tx.TimeSince(c.tx.ZeroTime()) count, err := c.Conn.Write(b) diff --git a/internal/measurexlite/conn_test.go b/internal/measurexlite/conn_test.go index d69f52780b..c4dcfeacb0 100644 --- a/internal/measurexlite/conn_test.go +++ b/internal/measurexlite/conn_test.go @@ -19,10 +19,10 @@ func TestRemoteAddrProvider(t *testing.T) { return nil }, } - if safeRemoteAddrNetwork(conn) != "" { + if SafeRemoteAddrNetwork(conn) != "" { t.Fatal("expected empty network") } - if safeRemoteAddrString(conn) != "" { + if SafeRemoteAddrString(conn) != "" { t.Fatal("expected empty string") } }) @@ -40,10 +40,10 @@ func TestRemoteAddrProvider(t *testing.T) { } }, } - if safeRemoteAddrNetwork(conn) != "tcp" { + if SafeRemoteAddrNetwork(conn) != "tcp" { t.Fatal("unexpected network") } - if safeRemoteAddrString(conn) != "1.1.1.1:443" { + if SafeRemoteAddrString(conn) != "1.1.1.1:443" { t.Fatal("unexpected string") } }) diff --git a/internal/urlgetter/config.go b/internal/urlgetter/config.go new file mode 100644 index 0000000000..ee97c28871 --- /dev/null +++ b/internal/urlgetter/config.go @@ -0,0 +1,34 @@ +package urlgetter + +// Config contains the configuration. +type Config struct { + // HTTPHost allows overriding the default HTTP host. + HTTPHost string `ooni:"Force using specific HTTP Host header"` + + // HTTPReferer sets the HTTP referer value. + HTTPReferer string `ooni:"Force using the specific HTTP Referer header"` + + // Method selects the HTTP method to use. + Method string `ooni:"Force HTTP method different than GET"` + + // NoFollowRedirects disables following redirects. + NoFollowRedirects bool `ooni:"Disable following redirects"` + + // TLSNextProtos is an OPTIONAL comma separated ALPN list. + TLSNextProtos string `ooni:"Comma-separated list of next protocols for ALPN"` + + // TLSServerName is the OPTIONAL SNI value. + TLSServerName string `ooni:"SNI value to use"` +} + +// Clone returns a deep copy of the given [*Config]. +func (cx *Config) Clone() *Config { + return &Config{ + HTTPHost: cx.HTTPHost, + HTTPReferer: cx.HTTPReferer, + Method: cx.Method, + NoFollowRedirects: cx.NoFollowRedirects, + TLSNextProtos: cx.TLSNextProtos, + TLSServerName: cx.TLSServerName, + } +} diff --git a/internal/urlgetter/dnslookup.go b/internal/urlgetter/dnslookup.go new file mode 100644 index 0000000000..2db387a150 --- /dev/null +++ b/internal/urlgetter/dnslookup.go @@ -0,0 +1,110 @@ +package urlgetter + +import ( + "context" + "net" + "net/url" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// DNSLookup measures a dnslookup://domain/ URL. +func (rx *Runner) DNSLookup(ctx context.Context, config *Config, URL *url.URL) error { + _, err := rx.DNSLookupOp(ctx, config, URL) + return err +} + +// DNSLookupResult contains the results of a DNS lookup. +type DNSLookupResult struct { + // Address is the resolved address. + Address string + + // Config is the original config. + Config *Config + + // URL is the original URL. + URL *url.URL +} + +// endpoint returns an endpoint given the address and the URL scheme. +func (rx *DNSLookupResult) endpoint() (string, error) { + // handle the case where there is an explicit port + if port := rx.URL.Port(); port != "" { + return net.JoinHostPort(rx.Address, port), nil + } + + // use the scheme to deduce the port + switch rx.URL.Scheme { + case "http": + return net.JoinHostPort(rx.Address, "80"), nil + case "https": + return net.JoinHostPort(rx.Address, "443"), nil + case "dot": + return net.JoinHostPort(rx.Address, "853"), nil + default: + return "", ErrUnknownURLScheme + } +} + +// DNSLookupOp resolves a domain name using the configured resolver. +func (rx *Runner) DNSLookupOp(ctx context.Context, config *Config, URL *url.URL) ([]*DNSLookupResult, error) { + // TODO(bassosimone): choose the proper DNS resolver depending on configuration. + return rx.DNSLookupGetaddrinfoOp(ctx, config, URL) +} + +// DNSLookupGetaddrinfoOp performs a DNS lookup using getaddrinfo. +func (rx *Runner) DNSLookupGetaddrinfoOp(ctx context.Context, config *Config, URL *url.URL) ([]*DNSLookupResult, error) { + // enforce timeout + const timeout = 4 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // obtain the next trace index + index := rx.IndexGen.Next() + + // create trace using the given underlying network + trace := measurexlite.NewTrace(index, rx.Begin) + trace.Netx = &netxlite.Netx{Underlying: rx.UNet} + + // obtain logger + logger := rx.Session.Logger() + + // create resolver + reso := trace.NewStdlibResolver(logger) + + // the domain to resolve is the URL's hostname + domain := URL.Hostname() + + // start operation logger + ol := logx.NewOperationLogger(logger, "[#%d] lookup %s using getaddrinfo", index, domain) + + // perform the lookup + addrs, err := reso.LookupHost(ctx, domain) + + // append the DNS lookup results + rx.TestKeys.AppendQueries(trace.DNSLookupsFromRoundTrip()...) + + // stop the operation logger + ol.Stop(err) + + // manually set the failure and failed operation + if err != nil { + rx.TestKeys.MaybeSetFailedOperation(netxlite.DNSRoundTripOperation) + rx.TestKeys.MaybeSetFailure(err.Error()) + return nil, err + } + + // emit results + var results []*DNSLookupResult + for _, addr := range addrs { + results = append(results, &DNSLookupResult{ + Address: addr, + Config: config, + URL: URL, + }) + } + return results, nil +} diff --git a/internal/urlgetter/doc.go b/internal/urlgetter/doc.go new file mode 100644 index 0000000000..982ccb0dfa --- /dev/null +++ b/internal/urlgetter/doc.go @@ -0,0 +1,2 @@ +// Package urlgetter contains a step-by-step urlgetter implementation. +package urlgetter diff --git a/internal/urlgetter/easyhandle.go b/internal/urlgetter/easyhandle.go new file mode 100644 index 0000000000..0a9193727a --- /dev/null +++ b/internal/urlgetter/easyhandle.go @@ -0,0 +1,92 @@ +package urlgetter + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" +) + +// EasyHandle measures URLs sequentially. +// +// The zero value is invalid. Please, initialize the MANDATORY fields. +type EasyHandle struct { + // Begin is the OPTIONAL time when the experiment begun. If you do not + // set this field, every target is measured independently. + Begin time.Time + + // IndexGen is the MANDATORY index generator. + IndexGen RunnerTraceIndexGenerator + + // Session is the MANDATORY session to use. If this is nil, the Run + // method will panic with a nil pointer error. + Session RunnerSession + + // UNet is the OPTIONAL underlying network to use. + UNet model.UnderlyingNetwork +} + +// EasyTarget is a target for [*EasyHandle]. +type EasyTarget struct { + // Config contains the target configuration. + Config *Config + + // URL contains the URL to measure. + URL string +} + +// Run gets the target URL and returns either the [*TestKeys] or an error. +func (hx *EasyHandle) Run(ctx context.Context, target *EasyTarget) (*TestKeys, error) { + // parse the target URL + URL, err := url.Parse(target.URL) + + // handle the case where we cannot parse the URL. + if err != nil { + return nil, fmt.Errorf("urlgetter: invalid target URL: %w", err) + } + + // obtain the measurement zero time + begin := hx.Begin + if begin.IsZero() { + begin = time.Now() + } + + // create the test keys + tk := &TestKeys{ + Agent: "", + BootstrapTime: 0, + DNSCache: []string{}, + FailedOperation: optional.None[string](), + Failure: optional.None[string](), + NetworkEvents: []*model.ArchivalNetworkEvent{}, + Queries: []*model.ArchivalDNSLookupResult{}, + Requests: []*model.ArchivalHTTPRequestResult{}, + SOCKSProxy: "", + TCPConnect: []*model.ArchivalTCPConnectResult{}, + TLSHandshakes: []*model.ArchivalTLSOrQUICHandshakeResult{}, + Tunnel: "", + } + + // create the runner + runner := &Runner{ + Begin: begin, + IndexGen: hx.IndexGen, + Session: hx.Session, + TestKeys: tk, + UNet: hx.UNet, + } + + // measure using the runner + err = runner.Run(ctx, target.Config, URL) + + // handle the case of fundamental measurement failure + if err != nil { + return nil, err + } + + // otherwise return the test keys + return tk, nil +} diff --git a/internal/urlgetter/http.go b/internal/urlgetter/http.go new file mode 100644 index 0000000000..7c04a995cf --- /dev/null +++ b/internal/urlgetter/http.go @@ -0,0 +1,315 @@ +package urlgetter + +import ( + "context" + "errors" + "io" + "net" + "net/http" + "net/url" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// HTTPTransaction performs an HTTP transaction using the given URL. +func (rx *Runner) HTTPTransaction(ctx context.Context, config *Config, URL *url.URL) error { + // TODO(bassosimone): honor the case where the user disabled HTTP redirection + + // TODO(bassosimone): we should also make sure we're correctly dealing with + // errors and we always wrap the errors we return here + + for idx := 0; idx < 10; idx++ { + // perform the round trip + resp, err := rx.HTTPRoundTrip(ctx, config, URL) + + // handle the case of failure + if err != nil { + return err + } + + // close the connection + resp.Close() + + // handle the case of redirect + if !httpRedirectIsRedirect(resp) { + return nil + } + // TODO(bassosimone): not 100% convinced whether what follows + // should be implemented as such or be different + if err := httpValidateRedirect(resp); err != nil { + return err + } + + // TODO(bassosimone): this code is broken because it does not handle + // cookies and we need cookies for some HTTP redirects to work + + // clone the original configuration + config = config.Clone() + + // set the referer header to be the original URL + config.HTTPReferer = URL.String() + + // replace the original URL with the location + URL = resp.Location + } + + // TODO(bassosimone): make sure the error we're using here is correct + return errors.New("too many HTTP redirects") +} + +// HTTPRoundTrip measures an HTTP or HTTPS URL. +func (rx *Runner) HTTPRoundTrip(ctx context.Context, config *Config, URL *url.URL) (*HTTPResponse, error) { + switch URL.Scheme { + case "http": + return rx.HTTPRoundTripCleartext(ctx, config, URL) + + case "https": + return rx.HTTPRoundTripSecure(ctx, config, URL) + + default: + return nil, ErrUnknownURLScheme + } +} + +// HTTPRoundTripCleartext measures an HTTP URL. +func (rx *Runner) HTTPRoundTripCleartext(ctx context.Context, config *Config, URL *url.URL) (*HTTPResponse, error) { + // establish a TCP connection + conn, err := rx.tcpConnect(ctx, config, URL) + if err != nil { + return nil, err + } + + // perform round trip + return rx.HTTPRoundTripOp(ctx, conn.AsHTTPConn(rx.Session.Logger())) +} + +// HTTPRoundTripSecure measures an HTTPS URL. +func (rx *Runner) HTTPRoundTripSecure(ctx context.Context, config *Config, URL *url.URL) (*HTTPResponse, error) { + // establish a TLS connection + conn, err := rx.tlsHandshake(ctx, config, URL) + if err != nil { + return nil, err + } + + // perform round trip + return rx.HTTPRoundTripOp(ctx, conn.AsHTTPConn(rx.Session.Logger())) +} + +// HTTPResponse summarizes an HTTP response. +type HTTPResponse struct { + // BodyReader allows to read the already read body snapshot + // followed by zero or more bytes after the snapshot, depending + // on whether the body is larger than the snapshot size. + BodyReader io.Reader + + // Conn is the conn. + Conn net.Conn + + // Location may contain the location if we're redirected. + Location *url.URL + + // Status contains the status code. + Status int +} + +// Close implements io.Closer. +func (rx *HTTPResponse) Close() error { + return rx.Conn.Close() +} + +// HTTPConn is an established HTTP connection. +type HTTPConn struct { + // Config is the original config. + Config *Config + + // Conn is the conn. + Conn net.Conn + + // Network is the network we're using. + Network string + + // RemoteAddress is the remote address we're using. + RemoteAddress string + + // TLSNegotiatedProtocol is the negotiated TLS protocol. + TLSNegotiatedProtocol string + + // Trace is the trace. + Trace *measurexlite.Trace + + // Transport is the single-use HTTP transport. + Transport model.HTTPTransport + + // URL is the original URL. + URL *url.URL +} + +var _ io.Closer = &TLSConn{} + +// Close implements io.Closer. +func (tx *HTTPConn) Close() error { + return tx.Conn.Close() +} + +func (cx *Config) method() (method string) { + method = "GET" + if cx.Method != "" { + method = cx.Method + } + return +} + +// HTTPRoundTripOp performs an HTTP round trip. +func (rx *Runner) HTTPRoundTripOp(ctx context.Context, input *HTTPConn) (*HTTPResponse, error) { + // enforce timeout + const timeout = 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // create HTTP request + req, err := rx.newHTTPRequest(ctx, input) + if err != nil { + // make sure we close the conn + input.Close() + return nil, err + } + + // start operation logger + ol := logx.NewOperationLogger( + rx.Session.Logger(), + "[#%d] %s %s with %s/%s host=%s", + input.Trace.Index(), + input.Config.method(), + req.URL.String(), + input.RemoteAddress, + input.Network, + req.Host, + ) + + // perform HTTP round trip + resp, err := rx.httpRoundTripOp(ctx, input, req) + + // stop the operation logger + ol.Stop(err) + + // handle failures + if err != nil { + // make sure we close the conn + input.Close() + + // attempt to set top level failure and failed operation + rx.TestKeys.MaybeSetFailedOperation(netxlite.HTTPRoundTripOperation) + rx.TestKeys.MaybeSetFailure(err.Error()) + + return nil, err + } + + // otherwise return the response + return resp, nil +} + +func (rx *Runner) newHTTPRequest(ctx context.Context, conn *HTTPConn) (*http.Request, error) { + // create the default HTTP request + req, err := http.NewRequestWithContext(ctx, conn.Config.method(), conn.URL.String(), nil) + if err != nil { + return nil, err + } + + // Go would use URL.Host as "Host" header anyways in case we leave req.Host empty. + // We already set it here so that we can use req.Host for logging. + req.Host = conn.URL.Host + + // apply headers + req.Header.Set("Accept", model.HTTPHeaderAccept) + req.Header.Set("Accept-Language", model.HTTPHeaderAcceptLanguage) + req.Header.Set("Referer", conn.Config.HTTPReferer) + req.Header.Set("User-Agent", model.HTTPHeaderUserAgent) + + // req.Header["Host"] is ignored by Go but we want to have it in the measurement + // to reflect what we think has been sent as HTTP headers. + req.Header.Set("Host", req.Host) + return req, nil +} + +func (rx *Runner) httpRoundTripOp(ctx context.Context, conn *HTTPConn, req *http.Request) (*HTTPResponse, error) { + // define the maximum body snapshot size + const snapSize = 1 << 19 + + // register when we started + started := conn.Trace.TimeSince(conn.Trace.ZeroTime()) + + // emit the beginning of the HTTP transaction + rx.TestKeys.AppendNetworkEvents(measurexlite.NewAnnotationArchivalNetworkEvent( + conn.Trace.Index(), + started, + "http_transaction_start", + conn.Trace.Tags()..., + )) + + // perform the round trip + resp, err := conn.Transport.RoundTrip(req) + + // on success also read a snapshot of the response body + var body []byte + if err == nil { + // read a snapshot of the response body + reader := io.LimitReader(resp.Body, snapSize) + body, err = netxlite.StreamAllContext(ctx, reader) + } + + // register when we finished + finished := conn.Trace.TimeSince(conn.Trace.ZeroTime()) + + // emit the end of the HTTP transaction + rx.TestKeys.AppendNetworkEvents(measurexlite.NewAnnotationArchivalNetworkEvent( + conn.Trace.Index(), + started, + "http_transaction_done", + conn.Trace.Tags()..., + )) + + // emit the HTTP request event + rx.TestKeys.PrependRequests(measurexlite.NewArchivalHTTPRequestResult( + conn.Trace.Index(), + started, + conn.Network, + conn.RemoteAddress, + conn.TLSNegotiatedProtocol, + conn.Transport.Network(), + req, + resp, + snapSize, + body, + err, + finished, + conn.Trace.Tags()..., + )) + + // produce a response or an error + return rx.httpFinish(conn.Conn, resp, err) +} + +func (rx *Runner) httpFinish(conn net.Conn, resp *http.Response, err error) (*HTTPResponse, error) { + // handle the case of failure first + if err != nil { + // make sure we do not leak the conn + conn.Close() + return nil, err + } + + // get the location + loc, _ := resp.Location() + + // fill and return the minimal HTTP response + hresp := &HTTPResponse{ + BodyReader: resp.Body, // TODO(bassosimone): not consistent with docs: should use io.MultiReader + Conn: conn, + Location: loc, + Status: resp.StatusCode, + } + return hresp, nil +} diff --git a/internal/urlgetter/httpredirect.go b/internal/urlgetter/httpredirect.go new file mode 100644 index 0000000000..78bfef575a --- /dev/null +++ b/internal/urlgetter/httpredirect.go @@ -0,0 +1,41 @@ +package urlgetter + +import ( + "errors" + + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// TODO(bassosimone): this duplicates code in webconnectivitylte and we should +// instead share this code and avoid creating duplication. +// +// However, this code is slightly changed, so it's not 100% clear what to do. + +// httpRedirectIsRedirect returns whether this response is a redirect +func httpRedirectIsRedirect(resp *HTTPResponse) bool { + switch resp.Status { + case 301, 302, 307, 308: + return true + default: + return false + } + +} + +// httpValidateRedirect validates a redirect. In case of failure, the +// returned error is a [*netxlite.ErrWrapper] instance. +// +// See https://github.com/ooni/probe/issues/2628 for context. +func httpValidateRedirect(resp *HTTPResponse) error { + if resp.Location == nil { + return errors.New("missing location header") + } + if resp.Location.Host == "" { + return &netxlite.ErrWrapper{ + Failure: netxlite.FailureHTTPInvalidRedirectLocationHost, + Operation: netxlite.HTTPRoundTripOperation, + WrappedErr: nil, + } + } + return nil +} diff --git a/internal/urlgetter/indexgen.go b/internal/urlgetter/indexgen.go new file mode 100644 index 0000000000..65b166c52f --- /dev/null +++ b/internal/urlgetter/indexgen.go @@ -0,0 +1,17 @@ +package urlgetter + +import "sync/atomic" + +// IndexGen generates new trace indexes. +// +// The zero value is ready to use. +type IndexGen struct { + idx atomic.Int64 +} + +var _ RunnerTraceIndexGenerator = &IndexGen{} + +// Next implements [RunnerTraceIndexGenerator]. +func (ig *IndexGen) Next() int64 { + return ig.idx.Add(1) +} diff --git a/internal/urlgetter/multihandle.go b/internal/urlgetter/multihandle.go new file mode 100644 index 0000000000..eb3e72f588 --- /dev/null +++ b/internal/urlgetter/multihandle.go @@ -0,0 +1,183 @@ +package urlgetter + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/erroror" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// MultiResult is a measurement result returned by [*MultiHandle]. +type MultiResult struct { + // Target is the target we measured. + Target *EasyTarget + + // TestKeys contains the [*TestKeys] or an error. + TestKeys erroror.Value[*TestKeys] +} + +// MultiHandle allows to run several measurements in paraller. +// +// The zero value is invalid. Please, initialize the MANDATORY fields. +type MultiHandle struct { + // Begin is the OPTIONAL time when the experiment begun. If you do not + // set this field, every target is measured independently. + Begin time.Time + + // IndexGen is the MANDATORY index generator. + IndexGen RunnerTraceIndexGenerator + + // Parallelism is the OPTIONAL parallelism to use. If this is + // zero, or negative, we use a reasonable default. + Parallelism int + + // Session is the MANDATORY session to use. If this is nil, the Run + // method will panic with a nil pointer error. + Session RunnerSession + + // UNet is the OPTIONAL underlying networ to be use. + UNet model.UnderlyingNetwork +} + +// Run measures the given targets in parallel using background goroutines and +// returns a channel where we post the measurement results. +func (hx *MultiHandle) Run(ctx context.Context, targets ...*EasyTarget) <-chan *MultiResult { + // determine the parallelism to use + const defaultParallelism = 3 + parallelism := max(hx.Parallelism, defaultParallelism) + + // create output channel + output := make(chan *MultiResult) + + // create input channel + input := make(chan *EasyTarget, len(targets)) + + // emit the targets + go func() { + defer close(input) + for _, target := range targets { + input <- target + } + }() + + // create wait group for awaiting for workers to be done + wg := &sync.WaitGroup{} + + // run workers in parallel + for idx := 0; idx < parallelism; idx++ { + wg.Add(1) + go func() { + defer wg.Done() + hx.worker(ctx, input, output) + }() + } + + // close the output channel when all workers are done + go func() { + defer close(output) + wg.Wait() + }() + + // return the output channel to the caller + return output +} + +// worker is a worker used by [*MultiHandle] to perform parallel work. +func (hx *MultiHandle) worker(ctx context.Context, input <-chan *EasyTarget, output chan<- *MultiResult) { + // process each target in the input stream + for target := range input { + + // initialize the result with empty TestKeys field + res := &MultiResult{ + Target: target, + TestKeys: erroror.Value[*TestKeys]{}, + } + + // create the easy handle + easy := &EasyHandle{ + Begin: hx.Begin, + IndexGen: hx.IndexGen, + Session: hx.Session, + UNet: hx.UNet, + } + + // perform the actual measurement + testkeys, err := easy.Run(ctx, target) + + // assign either the error or the test keys + if err != nil { + res.TestKeys.Err = err + } else { + res.TestKeys.Value = testkeys + } + + // emit the measurement result + output <- res + } +} + +// MultiCollect is a filter that reads measurements from the results channel, prints +// progress using the given callbacks, and post measurements on the returned channel. +// +// # Arguments +// +// - callbacks contains the experiment callbacks used to print progress. +// +// - overallStartIndex is the index from which we should start +// counting for printing progress, typically 0. +// +// - overallTotal is the total number of entries we're measuring. If this +// value is zero or negative, we assume a total count of 1. +// +// - prefix is the prefix to use for printing progress. +// +// - results is the channel from which to read measurement results. +func MultiCollect( + callbacks model.ExperimentCallbacks, + overallStartIndex int, + overallTotal int, + prefix string, + results <-chan *MultiResult, +) <-chan *MultiResult { + // create output channel + output := make(chan *MultiResult) + + // process the results channel in the background + go func() { + // make sure we close output when done + defer close(output) + + // initialize count to be the specified start index + count := overallStartIndex + + // process each entry + for result := range results { + // increment the number of results seen + count++ + + // emit progress information + prog := multiComputeProgress(count, overallTotal) + callbacks.OnProgress(prog, fmt.Sprintf( + "%s: measure %s: %+v", + prefix, + result.Target.URL, + model.ErrorToStringOrOK(result.TestKeys.Err), + )) + + // emit the result entry + output <- result + } + }() + + // return the output channel to the caller + return output +} + +// multiComputeProgress computes the progress trying to avoid divide by zero and +// returning values greater than the maximum multiComputeProgress value. +func multiComputeProgress(count, overallTotal int) float64 { + return min(1.0, float64(count)/float64(max(overallTotal, 1))) +} diff --git a/internal/urlgetter/runner.go b/internal/urlgetter/runner.go new file mode 100644 index 0000000000..36ce07d7da --- /dev/null +++ b/internal/urlgetter/runner.go @@ -0,0 +1,97 @@ +package urlgetter + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// RunnerTestKeys is the [*Runner] view of the [*TestKeys]. +type RunnerTestKeys interface { + // AppendNetworkEvents appends network events to the test keys. + AppendNetworkEvents(values ...*model.ArchivalNetworkEvent) + + // AppendQueries appends DNS lookup results to the test keys. + AppendQueries(values ...*model.ArchivalDNSLookupResult) + + // AppendTCPConnect appends TCP connect results to the test keys. + AppendTCPConnect(values ...*model.ArchivalTCPConnectResult) + + // AppendTLSHandshakes appends TLS handshakes results to the test keys. + AppendTLSHandshakes(values ...*model.ArchivalTLSOrQUICHandshakeResult) + + // MaybeSetFailedOperation sets the failed operation field if it's not already set. + MaybeSetFailedOperation(operation string) + + // MaybeSetFailure sets the failure string field if it's not already set. + MaybeSetFailure(failure string) + + // PrependRequests appends HTTP requests results to the test keys. + PrependRequests(values ...*model.ArchivalHTTPRequestResult) +} + +// RunnerTraceIndexGenerator generates trace indexes. +type RunnerTraceIndexGenerator interface { + Next() int64 +} + +// RunnerSession is the measurement session as seen by a [*Runner]. +type RunnerSession interface { + // Logger returns the logger use. + Logger() model.Logger +} + +// Runner performs measurements. +// +// The zero value is invalid; init all the fields marked as MANDATORY. +type Runner struct { + // Begin is the MANDATORY time when we started measuring. + Begin time.Time + + // IndexGen is the MANDATORY index generator. + IndexGen RunnerTraceIndexGenerator + + // Session is the MANDATORY session. + Session RunnerSession + + // TestKeys contains the MANDATORY test keys. + TestKeys RunnerTestKeys + + // UNet is the OPTIONAL underlying network. + UNet model.UnderlyingNetwork +} + +// ErrUnknownURLScheme indicates that we don't know how to handle a given target URL scheme. +var ErrUnknownURLScheme = errors.New("unknown URL scheme") + +// Run measures the given [*url.URL] using the given [*Config]. +func (rx *Runner) Run(ctx context.Context, config *Config, URL *url.URL) error { + switch scheme := URL.Scheme; scheme { + case "http", "https": + // Implementation note: only report error for fundamental failures + _ = rx.HTTPTransaction(ctx, config, URL) + return nil + + case "tlshandshake": + // Implementation note: only report error for fundamental failures + _ = rx.TLSHandshake(ctx, config, URL) + return nil + + case "tcpconnect": + // Implementation note: only report error for fundamental failures + _ = rx.TCPConnect(ctx, config, URL) + return nil + + case "dnslookup": + // Implementation note: only report error for fundamental failures + _ = rx.DNSLookup(ctx, config, URL) + return nil + + default: + return fmt.Errorf("%w: %s", ErrUnknownURLScheme, scheme) + } +} diff --git a/internal/urlgetter/tcpconnect.go b/internal/urlgetter/tcpconnect.go new file mode 100644 index 0000000000..6505d1b851 --- /dev/null +++ b/internal/urlgetter/tcpconnect.go @@ -0,0 +1,173 @@ +package urlgetter + +import ( + "context" + "errors" + "io" + "net" + "net/url" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/throttling" +) + +// TCPConnect measures a tcpconnect://:/ URL. +func (rx *Runner) TCPConnect(ctx context.Context, config *Config, URL *url.URL) error { + conn, err := rx.tcpConnect(ctx, config, URL) + measurexlite.MaybeClose(conn) + return err +} + +func (rx *Runner) tcpConnect(ctx context.Context, config *Config, URL *url.URL) (*TCPConn, error) { + // resolve the URL's domain using DNS + addrs, err := rx.DNSLookupOp(ctx, config, URL) + if err != nil { + return nil, err + } + + // loop until we establish a single TCP connection + var errv []error + for _, addr := range addrs { + conn, err := rx.TCPConnectOp(ctx, addr) + if err != nil { + errv = append(errv, err) + continue + } + return conn, nil + } + + // either return a joined error or nil + return nil, errors.Join(errv...) +} + +// TCPConn is an established TCP connection. +type TCPConn struct { + // Config is the original config. + Config *Config + + // Conn is the conn. + Conn net.Conn + + // Trace is the trace. + Trace *measurexlite.Trace + + // URL is the original URL. + URL *url.URL +} + +var _ io.Closer = &TCPConn{} + +// AsHTTPConn converts a [*TCPConn] to an [*HTTPConn]. +func (cx *TCPConn) AsHTTPConn(logger model.Logger) *HTTPConn { + return &HTTPConn{ + Config: cx.Config, + Conn: cx.Conn, + Network: "tcp", + RemoteAddress: measurexlite.SafeRemoteAddrString(cx.Conn), + TLSNegotiatedProtocol: "", + Trace: cx.Trace, + Transport: netxlite.NewHTTPTransportWithOptions( + logger, + netxlite.NewSingleUseDialer(cx.Conn), + netxlite.NewNullTLSDialer(), + ), + URL: cx.URL, + } +} + +// Close implements io.Closer. +func (tx *TCPConn) Close() error { + return tx.Conn.Close() +} + +// TCPConnectOp establishes a TCP connection. +func (rx *Runner) TCPConnectOp(ctx context.Context, input *DNSLookupResult) (*TCPConn, error) { + // enforce timeout + const timeout = 15 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // obtain the next trace index + index := rx.IndexGen.Next() + + // create trace using the given underlying network + trace := measurexlite.NewTrace(index, rx.Begin) + trace.Netx = &netxlite.Netx{Underlying: rx.UNet} + + // obtain logger + logger := rx.Session.Logger() + + // create dialer + dialer := trace.NewDialerWithoutResolver(logger) + + // the endpoint to use depends on the DNS lookup results + endpoint, err := input.endpoint() + if err != nil { + return nil, err + } + + // start operation logger + ol := logx.NewOperationLogger(logger, "[#%d] TCP connect %s", trace.Index(), endpoint) + + // establish the TCP connection + conn, err := dialer.DialContext(ctx, "tcp", endpoint) + + // stop the operation logger + ol.Stop(err) + + // append the TCP connect results + rx.TestKeys.AppendTCPConnect(trace.TCPConnects()...) + + // append the network events caused by TCP connect + rx.TestKeys.AppendNetworkEvents(trace.NetworkEvents()...) + + // in case of failure, set failed operation and failure + if err != nil { + rx.TestKeys.MaybeSetFailedOperation(netxlite.ConnectOperation) + rx.TestKeys.MaybeSetFailure(err.Error()) + return nil, err + } + + // start measuring throttling using a sampler + sampler := throttling.NewSampler(trace) + + // return the result + result := &TCPConn{ + Config: input.Config, + Conn: &tcpConnWrapper{ + Conn: conn, + Once: &sync.Once{}, + Sampler: sampler, + TestKeys: rx.TestKeys, + Trace: trace, + }, + Trace: trace, + URL: input.URL, + } + return result, nil +} + +// tcpConnWrapper wraps a connection and saves network events. +type tcpConnWrapper struct { + net.Conn + Once *sync.Once + Sampler *throttling.Sampler + TestKeys RunnerTestKeys + Trace *measurexlite.Trace +} + +// Close implements [io.Closer]. +func (c *tcpConnWrapper) Close() (err error) { + c.Once.Do(func() { + err = c.Conn.Close() + c.TestKeys.AppendNetworkEvents(c.Trace.NetworkEvents()...) + c.TestKeys.AppendNetworkEvents(c.Sampler.ExtractSamples()...) + _ = c.Sampler.Close() + }) + return +} diff --git a/internal/urlgetter/testkeys.go b/internal/urlgetter/testkeys.go new file mode 100644 index 0000000000..e20bfe438f --- /dev/null +++ b/internal/urlgetter/testkeys.go @@ -0,0 +1,85 @@ +package urlgetter + +import ( + "sync" + + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/optional" +) + +// TestKeys contains the experiment test keys. +type TestKeys struct { + // The following fields are part of the typical JSON emitted by OONI. + Agent string `json:"agent"` + BootstrapTime float64 `json:"bootstrap_time,omitempty"` + DNSCache []string `json:"dns_cache,omitempty"` + FailedOperation optional.Value[string] `json:"failed_operation"` + Failure optional.Value[string] `json:"failure"` + NetworkEvents []*model.ArchivalNetworkEvent `json:"network_events"` + Queries []*model.ArchivalDNSLookupResult `json:"queries"` + Requests []*model.ArchivalHTTPRequestResult `json:"requests"` + SOCKSProxy string `json:"socksproxy,omitempty"` + TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"` + TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"` + Tunnel string `json:"tunnel,omitempty"` + + // mu provides mutual exclusion. + mu sync.Mutex +} + +var _ RunnerTestKeys = &TestKeys{} + +// AppendNetworkEvents implements RunnerTestKeys. +func (tk *TestKeys) AppendNetworkEvents(values ...*model.ArchivalNetworkEvent) { + tk.mu.Lock() + tk.NetworkEvents = append(tk.NetworkEvents, values...) + tk.mu.Unlock() +} + +// AppendQueries implements RunnerTestKeys. +func (tk *TestKeys) AppendQueries(values ...*model.ArchivalDNSLookupResult) { + tk.mu.Lock() + tk.Queries = append(tk.Queries, values...) + tk.mu.Unlock() +} + +// PrependRequests implements RunnerTestKeys. +func (tk *TestKeys) PrependRequests(values ...*model.ArchivalHTTPRequestResult) { + tk.mu.Lock() + // Implementation note: append at the front since the most recent + // request must be at the beginning of the list. + tk.Requests = append(values, tk.Requests...) + tk.mu.Unlock() +} + +// AppendTCPConnect implements RunnerTestKeys. +func (tk *TestKeys) AppendTCPConnect(values ...*model.ArchivalTCPConnectResult) { + tk.mu.Lock() + tk.TCPConnect = append(tk.TCPConnect, values...) + tk.mu.Unlock() +} + +// AppendTLSHandshakes implements RunnerTestKeys. +func (tk *TestKeys) AppendTLSHandshakes(values ...*model.ArchivalTLSOrQUICHandshakeResult) { + tk.mu.Lock() + tk.TLSHandshakes = append(tk.TLSHandshakes, values...) + tk.mu.Unlock() +} + +// MaybeSetFailedOperation implements RunnerTestKeys. +func (tk *TestKeys) MaybeSetFailedOperation(operation string) { + tk.mu.Lock() + if tk.FailedOperation.IsNone() { + tk.FailedOperation = optional.Some(operation) + } + tk.mu.Unlock() +} + +// MaybeSetFailure implements RunnerTestKeys. +func (tk *TestKeys) MaybeSetFailure(failure string) { + tk.mu.Lock() + if tk.Failure.IsNone() { + tk.Failure = optional.Some(failure) + } + tk.mu.Unlock() +} diff --git a/internal/urlgetter/tlshandshake.go b/internal/urlgetter/tlshandshake.go new file mode 100644 index 0000000000..65f400af0b --- /dev/null +++ b/internal/urlgetter/tlshandshake.go @@ -0,0 +1,187 @@ +package urlgetter + +import ( + "context" + "crypto/tls" + "errors" + "io" + "net/url" + "strings" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// TLSHandshake measures a tlshandshake://:/ URL. +func (rx *Runner) TLSHandshake(ctx context.Context, config *Config, URL *url.URL) error { + conn, err := rx.tlsHandshake(ctx, config, URL) + measurexlite.MaybeClose(conn) + return err +} + +func (rx *Runner) tlsHandshake(ctx context.Context, config *Config, URL *url.URL) (*TLSConn, error) { + // resolve the URL's domain using DNS + addrs, err := rx.DNSLookupOp(ctx, config, URL) + if err != nil { + return nil, err + } + + // loop until we establish a single TLS connection + var errv []error + for _, addr := range addrs { + conn, err := rx.TCPConnectOp(ctx, addr) + if err != nil { + errv = append(errv, err) + continue + } + tlsconn, err := rx.TLSHandshakeOp(ctx, conn) + if err != nil { + conn.Close() + errv = append(errv, err) + continue + } + return tlsconn, nil + } + + // either return a joined error or nil + return nil, errors.Join(errv...) +} + +// TLSConn is an established TLS connection. +type TLSConn struct { + // Config is the original config. + Config *Config + + // Conn is the conn. + Conn model.TLSConn + + // Trace is the trace. + Trace *measurexlite.Trace + + // URL is the original URL. + URL *url.URL +} + +var _ io.Closer = &TLSConn{} + +// AsHTTPConn converts a [*TCPConn] to an [*HTTPConn]. +func (cx *TLSConn) AsHTTPConn(logger model.Logger) *HTTPConn { + return &HTTPConn{ + Config: cx.Config, + Conn: cx.Conn, + Network: "tcp", + RemoteAddress: measurexlite.SafeRemoteAddrString(cx.Conn), + TLSNegotiatedProtocol: cx.Conn.ConnectionState().NegotiatedProtocol, + Trace: cx.Trace, + Transport: netxlite.NewHTTPTransportWithOptions( + logger, + netxlite.NewNullDialer(), + netxlite.NewSingleUseTLSDialer(cx.Conn), + ), + URL: cx.URL, + } +} + +// Close implements io.Closer. +func (tx *TLSConn) Close() error { + return tx.Conn.Close() +} + +func (cx *Config) alpns(URL *url.URL, httpsValues []string) []string { + // handle the case where the user explicitly provided ALPNs + if len(cx.TLSNextProtos) > 0 { + return strings.Split(cx.TLSNextProtos, ",") + } + + // otherwise try to use the scheme. + switch URL.Scheme { + case "https": + return httpsValues + case "dot": + return []string{"dot"} + default: + return nil + } +} + +func (cx *Config) sni(URL *url.URL) string { + // handle the case where there's an explicit SNI + if len(cx.TLSServerName) > 0 { + return cx.TLSServerName + } + + // otheriwse use the URL's hostname. + return URL.Hostname() +} + +// TLSHandshakeOp performs TLS handshakes. +func (rx *Runner) TLSHandshakeOp(ctx context.Context, input *TCPConn) (*TLSConn, error) { + // enforce timeout + const timeout = 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // obtain logger + logger := rx.Session.Logger() + + // obtain the ALPNs + alpns := input.Config.alpns(input.URL, []string{"h2", "http/1.1"}) + + // obtain the SNI + serverName := input.Config.sni(input.URL) + + // start operation logger + ol := logx.NewOperationLogger( + logger, + "[#%d] TLS handshake ALPN=%v SNI=%s", + input.Trace.Index(), + alpns, + serverName, + ) + + // create the TLS handshaker to use + tlsHandshaker := input.Trace.NewTLSHandshakerStdlib(logger) + + // creae the TLS config to use + // + // See https://github.com/ooni/probe/issues/2413 to understand + // why we're using nil to force netxlite to use the cached + // default Mozilla cert pool. + tlsConfig := &tls.Config{ // #nosec G402 - we need to use a large TLS versions range for measuring + NextProtos: alpns, + RootCAs: nil, + ServerName: serverName, + } + + // perform the handshake + tlsConn, err := tlsHandshaker.Handshake(ctx, input.Conn, tlsConfig) + + // stop the operation logger + ol.Stop(err) + + // append the TLS handshake results + rx.TestKeys.AppendTLSHandshakes(input.Trace.TLSHandshakes()...) + + // handle the case of failure + if err != nil { + // make sure we close the connection + input.Conn.Close() + + // make sure we set failure and failed operation + rx.TestKeys.MaybeSetFailedOperation(netxlite.TLSHandshakeOperation) + rx.TestKeys.MaybeSetFailure(err.Error()) + return nil, err + } + + // handle the case of success + result := &TLSConn{ + Config: input.Config, + Conn: tlsConn, + Trace: input.Trace, + URL: input.URL, + } + return result, nil +} From 282363ceb59e21434c7506c98d78de2b82ff0c56 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Tue, 18 Jun 2024 14:38:28 +0200 Subject: [PATCH 2/2] poc: also convert the telegram experiment --- .../experiment/fbmessenger/fbmessenger.go | 3 +- internal/experiment/telegram/telegram.go | 69 +++++++++++-------- internal/experiment/telegram/telegram_test.go | 14 +--- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/internal/experiment/fbmessenger/fbmessenger.go b/internal/experiment/fbmessenger/fbmessenger.go index 7145a16fc9..d28400a597 100644 --- a/internal/experiment/fbmessenger/fbmessenger.go +++ b/internal/experiment/fbmessenger/fbmessenger.go @@ -180,8 +180,7 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { callbacks := args.Callbacks measurement := args.Measurement sess := args.Session - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() + //urlgetter.RegisterExtensions(measurement) // TODO(bassosimone) // generate targets diff --git a/internal/experiment/telegram/telegram.go b/internal/experiment/telegram/telegram.go index adf470b25e..4c8a883629 100644 --- a/internal/experiment/telegram/telegram.go +++ b/internal/experiment/telegram/telegram.go @@ -7,9 +7,10 @@ import ( "context" "time" - "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/optional" + "github.com/ooni/probe-cli/v3/internal/urlgetter" ) const ( @@ -23,10 +24,10 @@ type Config struct{} // TestKeys contains telegram test keys. type TestKeys struct { urlgetter.TestKeys - TelegramHTTPBlocking bool `json:"telegram_http_blocking"` - TelegramTCPBlocking bool `json:"telegram_tcp_blocking"` - TelegramWebFailure *string `json:"telegram_web_failure"` - TelegramWebStatus string `json:"telegram_web_status"` + TelegramHTTPBlocking bool `json:"telegram_http_blocking"` + TelegramTCPBlocking bool `json:"telegram_tcp_blocking"` + TelegramWebFailure optional.Value[string] `json:"telegram_web_failure"` + TelegramWebStatus string `json:"telegram_web_status"` } // NewTestKeys creates new telegram TestKeys. @@ -34,34 +35,41 @@ func NewTestKeys() *TestKeys { return &TestKeys{ TelegramHTTPBlocking: true, TelegramTCPBlocking: true, - TelegramWebFailure: nil, + TelegramWebFailure: optional.None[string](), TelegramWebStatus: "ok", } } // Update updates the TestKeys using the given MultiOutput result. -func (tk *TestKeys) Update(v urlgetter.MultiOutput) { +func (tk *TestKeys) Update(v *urlgetter.MultiResult) { + // handle the case where there are no test keys + if v.TestKeys.Err != nil { + return + } + // update the easy to update entries first - tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.NetworkEvents...) - tk.Queries = append(tk.Queries, v.TestKeys.Queries...) - tk.Requests = append(tk.Requests, v.TestKeys.Requests...) - tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.TCPConnect...) - tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.TLSHandshakes...) + tk.NetworkEvents = append(tk.NetworkEvents, v.TestKeys.Value.NetworkEvents...) + tk.Queries = append(tk.Queries, v.TestKeys.Value.Queries...) + tk.Requests = append(tk.Requests, v.TestKeys.Value.Requests...) + tk.TCPConnect = append(tk.TCPConnect, v.TestKeys.Value.TCPConnect...) + tk.TLSHandshakes = append(tk.TLSHandshakes, v.TestKeys.Value.TLSHandshakes...) + // then process access points - if v.Input.Config.Method != "GET" { - if v.TestKeys.Failure == nil { + if v.Target.Config.Method != "GET" { + if v.TestKeys.Value.Failure.IsNone() { tk.TelegramHTTPBlocking = false tk.TelegramTCPBlocking = false return // found successful access point connection } - if v.TestKeys.FailedOperation == nil || *v.TestKeys.FailedOperation != netxlite.ConnectOperation { + if v.TestKeys.Value.FailedOperation.UnwrapOr("") != netxlite.ConnectOperation { tk.TelegramTCPBlocking = false } return } - if v.TestKeys.Failure != nil { + + if !v.TestKeys.Value.Failure.IsNone() { tk.TelegramWebStatus = "blocked" - tk.TelegramWebFailure = v.TestKeys.Failure + tk.TelegramWebFailure = v.TestKeys.Value.Failure return } } @@ -71,9 +79,6 @@ type Measurer struct { // Config contains the experiment settings. If empty we // will be using default settings. Config Config - - // Getter is an optional getter to be used for testing. - Getter urlgetter.MultiGetter } // ExperimentName implements ExperimentMeasurer.ExperimentName @@ -102,13 +107,12 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { measurement := args.Measurement sess := args.Session - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - urlgetter.RegisterExtensions(measurement) - inputs := []urlgetter.MultiInput{ + //urlgetter.RegisterExtensions(measurement) // TODO(bassosimone) + + inputs := []*urlgetter.EasyTarget{ // Here we need to provide the method explicitly. See // https://github.com/ooni/probe-engine/issues/827. - {Target: "https://web.telegram.org/", Config: urlgetter.Config{ + {URL: "https://web.telegram.org/", Config: &urlgetter.Config{ Method: "GET", }}, } @@ -116,15 +120,20 @@ func (m Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error { // We need to measure each address twice. Once using port 80 and once using port 443. In both // cases, the protocol MUST be HTTP. The DCs do not support access on port 443 using TLS. for _, dc := range DatacenterIPAddrs { - inputs = append(inputs, urlgetter.MultiInput{Target: "http://" + dc + "/", Config: urlgetter.Config{Method: "POST"}}) - inputs = append(inputs, urlgetter.MultiInput{Target: "http://" + dc + ":443/", Config: urlgetter.Config{Method: "POST"}}) + inputs = append(inputs, &urlgetter.EasyTarget{URL: "http://" + dc + "/", Config: &urlgetter.Config{Method: "POST"}}) + inputs = append(inputs, &urlgetter.EasyTarget{URL: "http://" + dc + ":443/", Config: &urlgetter.Config{Method: "POST"}}) } - multi := urlgetter.Multi{Begin: time.Now(), Getter: m.Getter, Session: sess} + multi := &urlgetter.MultiHandle{ + Begin: time.Now(), + IndexGen: &urlgetter.IndexGen{}, + Session: sess, + } testkeys := NewTestKeys() testkeys.Agent = "redirect" measurement.TestKeys = testkeys - for entry := range multi.Collect(ctx, inputs, "telegram", callbacks) { + results := urlgetter.MultiCollect(callbacks, 0, len(inputs), "telegram", multi.Run(ctx, inputs...)) + for entry := range results { testkeys.Update(entry) } return nil @@ -150,7 +159,7 @@ func (tk *TestKeys) MeasurementSummaryKeys() model.MeasurementSummaryKeys { sk := &SummaryKeys{IsAnomaly: false} tcpBlocking := tk.TelegramTCPBlocking httpBlocking := tk.TelegramHTTPBlocking - webBlocking := tk.TelegramWebFailure != nil + webBlocking := !tk.TelegramWebFailure.IsNone() sk.TCPBlocking = tcpBlocking sk.HTTPBlocking = httpBlocking sk.WebBlocking = webBlocking diff --git a/internal/experiment/telegram/telegram_test.go b/internal/experiment/telegram/telegram_test.go index 767d286f70..064f56a133 100644 --- a/internal/experiment/telegram/telegram_test.go +++ b/internal/experiment/telegram/telegram_test.go @@ -1,21 +1,9 @@ package telegram_test import ( - "context" - "fmt" - "io" - "net/http" "testing" - "github.com/apex/log" - "github.com/google/gopacket/layers" - "github.com/ooni/netem" "github.com/ooni/probe-cli/v3/internal/experiment/telegram" - "github.com/ooni/probe-cli/v3/internal/experiment/urlgetter" - "github.com/ooni/probe-cli/v3/internal/mocks" - "github.com/ooni/probe-cli/v3/internal/model" - "github.com/ooni/probe-cli/v3/internal/netemx" - "github.com/ooni/probe-cli/v3/internal/netxlite" ) func TestNewExperimentMeasurer(t *testing.T) { @@ -28,6 +16,7 @@ func TestNewExperimentMeasurer(t *testing.T) { } } +/* func TestUpdateWithNoAccessPointsBlocking(t *testing.T) { tk := telegram.NewTestKeys() tk.Update(urlgetter.MultiOutput{ @@ -496,3 +485,4 @@ func TestMeasurerRun(t *testing.T) { }) }) } +*/