From b51640c543e43456336c4cb9be14857d3961a686 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 21 Dec 2023 19:44:33 +0100 Subject: [PATCH] feat(oohelperd): protect against overload and add metrics (#1442) ## Checklist - [x] I have read the [contribution guidelines](https://github.com/ooni/probe-cli/blob/master/CONTRIBUTING.md) - [x] reference issue for this pull request: https://github.com/ooni/probe/issues/2649 - [x] if you changed anything related to how experiments work and you need to reflect these changes in the ooni/spec repository, please link to the related ooni/spec pull request: N/A - [x] if you changed code inside an experiment, make sure you bump its version number: N/A ## Description This diff makes oohelperd more robust against overload and introduces additional prometheus metrics. --- internal/oohelperd/dns.go | 7 ++++ internal/oohelperd/handler.go | 37 ++++++++++++++++++- internal/oohelperd/handler_test.go | 43 ++++++++++++++++++++++ internal/oohelperd/http.go | 32 +++++++++++++++- internal/oohelperd/metrics.go | 59 +++++++++++++++++++++++------- internal/oohelperd/tcptls.go | 40 +++++++++++++++++++- 6 files changed, 202 insertions(+), 16 deletions(-) diff --git a/internal/oohelperd/dns.go b/internal/oohelperd/dns.go index 982b3241e5..a67db4ff60 100644 --- a/internal/oohelperd/dns.go +++ b/internal/oohelperd/dns.go @@ -54,11 +54,18 @@ func dnsDo(ctx context.Context, config *dnsConfig) { reso := config.NewResolver(config.Logger) defer reso.CloseIdleConnections() + // take the time before running this micro-measurement + started := time.Now() + // perform and log the actual DNS lookup ol := logx.NewOperationLogger(config.Logger, "DNSLookup %s", config.Domain) addrs, err := reso.LookupHost(ctx, config.Domain) ol.Stop(err) + // publish the time required for running this micro-measurement + elapsed := time.Since(started) + metricDNSTaskDurationSeconds.Observe(elapsed.Seconds()) + // make sure we return an empty slice on failure because this // is what the legacy TH would have done. if addrs == nil { diff --git a/internal/oohelperd/handler.go b/internal/oohelperd/handler.go index ace6709b5a..a18a2da113 100644 --- a/internal/oohelperd/handler.go +++ b/internal/oohelperd/handler.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "net/http/cookiejar" + "strings" "sync/atomic" "time" @@ -32,6 +33,10 @@ type Handler struct { // BaseLogger is the MANDATORY logger to use. BaseLogger model.Logger + // CountRequests is the MANDATORY count of the number of + // requests that are currently in flight. + CountRequests *atomic.Int64 + // Indexer is the MANDATORY atomic integer used to assign an index to requests. Indexer *atomic.Int64 @@ -67,6 +72,7 @@ var _ http.Handler = &Handler{} func NewHandler() *Handler { return &Handler{ BaseLogger: log.Log, + CountRequests: &atomic.Int64{}, Indexer: &atomic.Int64{}, MaxAcceptableBody: MaxAcceptableBodySize, Measure: measure, @@ -103,6 +109,26 @@ func NewHandler() *Handler { } } +// handlerShouldThrottleClient returns true if the handler should throttle +// the current client depending on the instantaneous load. +// +// See https://github.com/ooni/probe/issues/2649 for context. +func handlerShouldThrottleClient(inflight int64, userAgent string) bool { + switch { + // With less than 25 inflight requests we allow all clients + case inflight < 25: + return false + + // With less than 50 inflight requests we give priority to official clients + case inflight < 50 && strings.HasPrefix(userAgent, "ooniprobe-"): + return false + + // Otherwise, we're very sorry + default: + return true + } +} + // ServeHTTP implements http.Handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // track the number of in-flight requests @@ -123,6 +149,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } + // protect against too many requests in flight + if handlerShouldThrottleClient(h.CountRequests.Load(), req.Header.Get("user-agent")) { + metricRequestsCount.WithLabelValues("503", "service_unavailable").Inc() + w.WriteHeader(503) + return + } + h.CountRequests.Add(1) + defer h.CountRequests.Add(-1) + // read and parse request body reader := io.LimitReader(req.Body, h.MaxAcceptableBody) data, err := netxlite.ReadAllContext(req.Context(), reader) @@ -144,7 +179,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { elapsed := time.Since(started) // track the time required to produce a response - metricWCTaskDurationSeconds.Observe(float64(elapsed.Seconds())) + metricWCTaskDurationSeconds.Observe(elapsed.Seconds()) // handle the case of fundamental failure if err != nil { diff --git a/internal/oohelperd/handler_test.go b/internal/oohelperd/handler_test.go index 48a284e237..3b30fbb3c8 100644 --- a/internal/oohelperd/handler_test.go +++ b/internal/oohelperd/handler_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "net/http" "strings" @@ -11,6 +12,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/version" ) // simpleRequestForHandler is a simple request for the [handler]. @@ -67,11 +69,19 @@ func TestHandlerWorkingAsIntended(t *testing.T) { // reqContentType is the content-type for the HTTP request reqContentType string + // reqUserAgent is the optional user-agent to use + // when preparing the HTTP request + reqUserAgent string + // measureFn optionally allows overriding the default // value of the handler.Measure function measureFn func( ctx context.Context, config *Handler, creq *model.THRequest) (*model.THResponse, error) + // initialCountRequests is the initial value to + // use for the CountRequests field. + initialCountRequests int64 + // reqBody is the request body to use reqBody io.Reader @@ -146,6 +156,31 @@ func TestHandlerWorkingAsIntended(t *testing.T) { respStatusCode: 200, respContentType: "application/json", parseBody: true, + }, { + name: "we throttle miniooni with 25+ requests inflight", + reqMethod: "POST", + reqContentType: "application/json", + reqUserAgent: fmt.Sprintf("miniooni/%s ooniprobe-engine/%s", version.Version, version.Version), + measureFn: measure, + initialCountRequests: 25, + reqBody: strings.NewReader(simpleRequestForHandler), + respStatusCode: 503, + respContentType: "", + parseBody: false, + }, { + name: "we do not throttle ooniprobe-cli with <= 49 requests inflight", + reqMethod: "POST", + reqContentType: "application/json", + reqUserAgent: fmt.Sprintf("ooniprobe-cli/%s ooniprobe-engine/%s", version.Version, version.Version), + measureFn: func(ctx context.Context, config *Handler, creq *model.THRequest) (*model.THResponse, error) { + cresp := &model.THResponse{} + return cresp, nil + }, + initialCountRequests: 49, + reqBody: strings.NewReader(simpleRequestForHandler), + respStatusCode: 200, + respContentType: "application/json", + parseBody: true, }} for _, expect := range expectations { @@ -156,6 +191,11 @@ func TestHandlerWorkingAsIntended(t *testing.T) { handler.Measure = expect.measureFn } + // configure the CountRequests field if needed + if expect.initialCountRequests > 0 { + handler.CountRequests.Add(expect.initialCountRequests) // 0 + value = value :-) + } + // create request req, err := http.NewRequestWithContext( context.Background(), @@ -169,6 +209,9 @@ func TestHandlerWorkingAsIntended(t *testing.T) { if expect.reqContentType != "" { req.Header.Add("content-type", expect.reqContentType) } + if expect.reqUserAgent != "" { + req.Header.Add("user-agent", expect.reqUserAgent) + } // create response writer var ( diff --git a/internal/oohelperd/http.go b/internal/oohelperd/http.go index e6867187ee..fe1ac71283 100644 --- a/internal/oohelperd/http.go +++ b/internal/oohelperd/http.go @@ -57,11 +57,18 @@ type httpConfig struct { // httpDo performs the HTTP check. func httpDo(ctx context.Context, config *httpConfig) { + // make sure we log about the operation ol := logx.NewOperationLogger(config.Logger, "GET %s", config.URL) + + // we want to limit the maximum amount of time we spend here const timeout = 15 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + + // we want the caller to know when we're done running defer config.Wg.Done() + + // now let's create an HTTP request req, err := http.NewRequestWithContext(ctx, "GET", config.URL, nil) if err != nil { // fix: emit -1 like the old test helper does @@ -75,8 +82,9 @@ func httpDo(ctx context.Context, config *httpConfig) { ol.Stop(err) return } + // The original test helper failed with extra headers while here - // we're implementing (for now?) a more liberal approach. + // we're implementing a more liberal approach. for k, vs := range config.Headers { switch strings.ToLower(k) { case "user-agent", "accept", "accept-language": @@ -85,9 +93,22 @@ func httpDo(ctx context.Context, config *httpConfig) { } } } + + // we need a client because we want to follow redirects clnt := config.NewClient(config.Logger) defer clnt.CloseIdleConnections() + + // take the time before starting the HTTP task + t0 := time.Now() + + // fetch the webpage following redirects resp, err := clnt.Do(req) + + // publish the elapsed time required for measuring HTTP + elapsed := time.Since(t0) + metricHTTPTaskDurationSeconds.Observe(elapsed.Seconds()) + + // handle the case of failure if err != nil { // fix: emit -1 like the old test helper does config.Out <- ctrlHTTPResponse{ @@ -100,20 +121,29 @@ func httpDo(ctx context.Context, config *httpConfig) { ol.Stop(err) return } + + // make sure we eventually close the body defer resp.Body.Close() + + // copy headers headers := make(map[string]string) for k := range resp.Header { headers[k] = resp.Header.Get(k) } + + // read the body up within a given maximum limit + // TODO(bassosimone): do we need to compute whether the body was truncated? reader := &io.LimitedReader{R: resp.Body, N: config.MaxAcceptableBody} data, err := netxlite.ReadAllContext(ctx, reader) ol.Stop(err) + // optionally check whether there's an HTTP3 endpoint h3Endpoint := "" if config.searchForH3 { h3Endpoint = discoverH3Endpoint(resp, req) } + // we're good and we can emit a final response now config.Out <- ctrlHTTPResponse{ BodyLength: int64(len(data)), DiscoveredH3Endpoint: h3Endpoint, diff --git a/internal/oohelperd/metrics.go b/internal/oohelperd/metrics.go index 37e99b6e01..277321d2b2 100644 --- a/internal/oohelperd/metrics.go +++ b/internal/oohelperd/metrics.go @@ -11,6 +11,21 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +// metricsSummaryObjectives returns the summary objectives for promauto.NewSummary. +func metricsSummaryObjectives() map[float64]float64 { + // See https://grafana.com/blog/2022/03/01/how-summary-metrics-work-in-prometheus/ + // + // TODO(bassosimone,FedericoCeratto): investigate whether using + // a shorter-than-10m observation interval is better for us + return map[float64]float64{ + 0.25: 0.010, // 0.240 <= φ <= 0.260 + 0.5: 0.010, // 0.490 <= φ <= 0.510 + 0.75: 0.010, // 0.740 <= φ <= 0.760 + 0.9: 0.010, // 0.899 <= φ <= 0.901 + 0.99: 0.001, // 0.989 <= φ <= 0.991 + } +} + var ( // metricRequestsCount counts the number of requests we served. metricRequestsCount = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -26,18 +41,36 @@ var ( // metricWCTaskDurationSeconds summarizes the duration of the web connectivity measurement task. metricWCTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{ - Name: "oohelperd_wctask_duration_seconds", - Help: "Summarizes the time to complete the Web Connectivity measurement task (in seconds)", - // See https://grafana.com/blog/2022/03/01/how-summary-metrics-work-in-prometheus/ - // - // TODO(bassosimone,FedericoCeratto): investigate whether using - // a shorter-than-10m observation interval is better for us - Objectives: map[float64]float64{ - 0.25: 0.010, // 0.240 <= φ <= 0.260 - 0.5: 0.010, // 0.490 <= φ <= 0.510 - 0.75: 0.010, // 0.740 <= φ <= 0.760 - 0.9: 0.010, // 0.899 <= φ <= 0.901 - 0.99: 0.001, // 0.989 <= φ <= 0.991 - }, + Name: "oohelperd_wctask_duration_seconds", + Help: "Summarizes the time to complete the Web Connectivity measurement task (in seconds)", + Objectives: metricsSummaryObjectives(), + }) + + // metricDNSTaskDurationSeconds summarizes the duration of the DNS task. + metricDNSTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "oohelperd_dnstask_duration_seconds", + Help: "Summarizes the time to complete the DNS measurement task (in seconds)", + Objectives: metricsSummaryObjectives(), + }) + + // metricTCPTaskDurationSeconds summarizes the duration of the TCP task. + metricTCPTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "oohelperd_tcptask_duration_seconds", + Help: "Summarizes the time to complete the TCP measurement task (in seconds)", + Objectives: metricsSummaryObjectives(), + }) + + // metricTLSTaskDurationSeconds summarizes the duration of the TLS task. + metricTLSTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "oohelperd_tlstask_duration_seconds", + Help: "Summarizes the time to complete the TLS measurement task (in seconds)", + Objectives: metricsSummaryObjectives(), + }) + + // metricHTTPTaskDurationSeconds summarizes the duration of the HTTP task. + metricHTTPTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "oohelperd_httptask_duration_seconds", + Help: "Summarizes the time to complete the HTTP measurement task (in seconds)", + Objectives: metricsSummaryObjectives(), }) ) diff --git a/internal/oohelperd/tcptls.go b/internal/oohelperd/tcptls.go index 073e90e92f..d39dc73be3 100644 --- a/internal/oohelperd/tcptls.go +++ b/internal/oohelperd/tcptls.go @@ -69,10 +69,16 @@ type tcpTLSConfig struct { // tcpTLSDo performs the TCP and (possibly) TLS checks. func tcpTLSDo(ctx context.Context, config *tcpTLSConfig) { + + // add an overall timeout for this task const timeout = 15 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + + // tell the parent when we're done defer config.Wg.Done() + + // assemble result and arrange for it to be written back out := &tcpResultPair{ Address: config.Address, Endpoint: config.Endpoint, @@ -82,6 +88,9 @@ func tcpTLSDo(ctx context.Context, config *tcpTLSConfig) { defer func() { config.Out <- out }() + + // 1: TCP dial + ol := logx.NewOperationLogger( config.Logger, "TCPConnect %s EnableTLS=%v SNI=%s", @@ -89,16 +98,32 @@ func tcpTLSDo(ctx context.Context, config *tcpTLSConfig) { config.EnableTLS, config.URLHostname, ) + dialer := config.NewDialer(config.Logger) defer dialer.CloseIdleConnections() + + // save the time before we start connecting + tcpT0 := time.Now() + + // establish a TCP connection conn, err := dialer.DialContext(ctx, "tcp", config.Endpoint) + + // publish the time required to connect + tcpElapsed := time.Since(tcpT0) + metricTCPTaskDurationSeconds.Observe(tcpElapsed.Seconds()) + + // make sure we fill the TCP stanza out.TCP.Failure = tcpMapFailure(newfailure(err)) out.TCP.Status = err == nil defer measurexlite.MaybeClose(conn) + if err != nil || !config.EnableTLS { ol.Stop(err) return } + + // 2: TLS handshake (if needed) + // See https://github.com/ooni/probe/issues/2413 to understand // why we're using nil to force netxlite to use the cached // default Mozilla cert pool. @@ -107,15 +132,28 @@ func tcpTLSDo(ctx context.Context, config *tcpTLSConfig) { RootCAs: nil, ServerName: config.URLHostname, } + thx := config.NewTSLHandshaker(config.Logger) + + // save time before handshake + tlsT0 := time.Now() + + // perform the handshake tlsConn, err := thx.Handshake(ctx, conn, tlsConfig) + measurexlite.MaybeClose(tlsConn) + + // publish time required to handshake + tlsElapsed := time.Since(tlsT0) + metricTLSTaskDurationSeconds.Observe(tlsElapsed.Seconds()) + ol.Stop(err) + + // we're good and we can fill the result out.TLS = &ctrlTLSResult{ ServerName: config.URLHostname, Status: err == nil, Failure: newfailure(err), } - measurexlite.MaybeClose(tlsConn) } // tcpMapFailure attempts to map netxlite failures to the strings