Skip to content

Commit

Permalink
feat(oohelperd): protect against overload and add metrics (#1442)
Browse files Browse the repository at this point in the history
## Checklist

- [x] I have read the [contribution
guidelines](https://github.com/ooni/probe-cli/blob/master/CONTRIBUTING.md)
- [x] reference issue for this pull request:
ooni/probe#2649
- [x] if you changed anything related to how experiments work and you
need to reflect these changes in the ooni/spec repository, please link
to the related ooni/spec pull request: N/A
- [x] if you changed code inside an experiment, make sure you bump its
version number: N/A

## Description

This diff makes oohelperd more robust against overload and introduces
additional prometheus metrics.
  • Loading branch information
bassosimone committed Dec 21, 2023
1 parent 2d3173a commit b51640c
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 16 deletions.
7 changes: 7 additions & 0 deletions internal/oohelperd/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,18 @@ func dnsDo(ctx context.Context, config *dnsConfig) {
reso := config.NewResolver(config.Logger)
defer reso.CloseIdleConnections()

// take the time before running this micro-measurement
started := time.Now()

// perform and log the actual DNS lookup
ol := logx.NewOperationLogger(config.Logger, "DNSLookup %s", config.Domain)
addrs, err := reso.LookupHost(ctx, config.Domain)
ol.Stop(err)

// publish the time required for running this micro-measurement
elapsed := time.Since(started)
metricDNSTaskDurationSeconds.Observe(elapsed.Seconds())

// make sure we return an empty slice on failure because this
// is what the legacy TH would have done.
if addrs == nil {
Expand Down
37 changes: 36 additions & 1 deletion internal/oohelperd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net/http"
"net/http/cookiejar"
"strings"
"sync/atomic"
"time"

Expand All @@ -32,6 +33,10 @@ type Handler struct {
// BaseLogger is the MANDATORY logger to use.
BaseLogger model.Logger

// CountRequests is the MANDATORY count of the number of
// requests that are currently in flight.
CountRequests *atomic.Int64

// Indexer is the MANDATORY atomic integer used to assign an index to requests.
Indexer *atomic.Int64

Expand Down Expand Up @@ -67,6 +72,7 @@ var _ http.Handler = &Handler{}
func NewHandler() *Handler {
return &Handler{
BaseLogger: log.Log,
CountRequests: &atomic.Int64{},
Indexer: &atomic.Int64{},
MaxAcceptableBody: MaxAcceptableBodySize,
Measure: measure,
Expand Down Expand Up @@ -103,6 +109,26 @@ func NewHandler() *Handler {
}
}

// handlerShouldThrottleClient returns true if the handler should throttle
// the current client depending on the instantaneous load.
//
// See https://github.com/ooni/probe/issues/2649 for context.
func handlerShouldThrottleClient(inflight int64, userAgent string) bool {
switch {
// With less than 25 inflight requests we allow all clients
case inflight < 25:
return false

// With less than 50 inflight requests we give priority to official clients
case inflight < 50 && strings.HasPrefix(userAgent, "ooniprobe-"):
return false

// Otherwise, we're very sorry
default:
return true
}
}

// ServeHTTP implements http.Handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// track the number of in-flight requests
Expand All @@ -123,6 +149,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

// protect against too many requests in flight
if handlerShouldThrottleClient(h.CountRequests.Load(), req.Header.Get("user-agent")) {
metricRequestsCount.WithLabelValues("503", "service_unavailable").Inc()
w.WriteHeader(503)
return
}
h.CountRequests.Add(1)
defer h.CountRequests.Add(-1)

// read and parse request body
reader := io.LimitReader(req.Body, h.MaxAcceptableBody)
data, err := netxlite.ReadAllContext(req.Context(), reader)
Expand All @@ -144,7 +179,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
elapsed := time.Since(started)

// track the time required to produce a response
metricWCTaskDurationSeconds.Observe(float64(elapsed.Seconds()))
metricWCTaskDurationSeconds.Observe(elapsed.Seconds())

// handle the case of fundamental failure
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions internal/oohelperd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"testing"

"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/version"
)

// simpleRequestForHandler is a simple request for the [handler].
Expand Down Expand Up @@ -67,11 +69,19 @@ func TestHandlerWorkingAsIntended(t *testing.T) {
// reqContentType is the content-type for the HTTP request
reqContentType string

// reqUserAgent is the optional user-agent to use
// when preparing the HTTP request
reqUserAgent string

// measureFn optionally allows overriding the default
// value of the handler.Measure function
measureFn func(
ctx context.Context, config *Handler, creq *model.THRequest) (*model.THResponse, error)

// initialCountRequests is the initial value to
// use for the CountRequests field.
initialCountRequests int64

// reqBody is the request body to use
reqBody io.Reader

Expand Down Expand Up @@ -146,6 +156,31 @@ func TestHandlerWorkingAsIntended(t *testing.T) {
respStatusCode: 200,
respContentType: "application/json",
parseBody: true,
}, {
name: "we throttle miniooni with 25+ requests inflight",
reqMethod: "POST",
reqContentType: "application/json",
reqUserAgent: fmt.Sprintf("miniooni/%s ooniprobe-engine/%s", version.Version, version.Version),
measureFn: measure,
initialCountRequests: 25,
reqBody: strings.NewReader(simpleRequestForHandler),
respStatusCode: 503,
respContentType: "",
parseBody: false,
}, {
name: "we do not throttle ooniprobe-cli with <= 49 requests inflight",
reqMethod: "POST",
reqContentType: "application/json",
reqUserAgent: fmt.Sprintf("ooniprobe-cli/%s ooniprobe-engine/%s", version.Version, version.Version),
measureFn: func(ctx context.Context, config *Handler, creq *model.THRequest) (*model.THResponse, error) {
cresp := &model.THResponse{}
return cresp, nil
},
initialCountRequests: 49,
reqBody: strings.NewReader(simpleRequestForHandler),
respStatusCode: 200,
respContentType: "application/json",
parseBody: true,
}}

for _, expect := range expectations {
Expand All @@ -156,6 +191,11 @@ func TestHandlerWorkingAsIntended(t *testing.T) {
handler.Measure = expect.measureFn
}

// configure the CountRequests field if needed
if expect.initialCountRequests > 0 {
handler.CountRequests.Add(expect.initialCountRequests) // 0 + value = value :-)
}

// create request
req, err := http.NewRequestWithContext(
context.Background(),
Expand All @@ -169,6 +209,9 @@ func TestHandlerWorkingAsIntended(t *testing.T) {
if expect.reqContentType != "" {
req.Header.Add("content-type", expect.reqContentType)
}
if expect.reqUserAgent != "" {
req.Header.Add("user-agent", expect.reqUserAgent)
}

// create response writer
var (
Expand Down
32 changes: 31 additions & 1 deletion internal/oohelperd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,18 @@ type httpConfig struct {

// httpDo performs the HTTP check.
func httpDo(ctx context.Context, config *httpConfig) {
// make sure we log about the operation
ol := logx.NewOperationLogger(config.Logger, "GET %s", config.URL)

// we want to limit the maximum amount of time we spend here
const timeout = 15 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// we want the caller to know when we're done running
defer config.Wg.Done()

// now let's create an HTTP request
req, err := http.NewRequestWithContext(ctx, "GET", config.URL, nil)
if err != nil {
// fix: emit -1 like the old test helper does
Expand All @@ -75,8 +82,9 @@ func httpDo(ctx context.Context, config *httpConfig) {
ol.Stop(err)
return
}

// The original test helper failed with extra headers while here
// we're implementing (for now?) a more liberal approach.
// we're implementing a more liberal approach.
for k, vs := range config.Headers {
switch strings.ToLower(k) {
case "user-agent", "accept", "accept-language":
Expand All @@ -85,9 +93,22 @@ func httpDo(ctx context.Context, config *httpConfig) {
}
}
}

// we need a client because we want to follow redirects
clnt := config.NewClient(config.Logger)
defer clnt.CloseIdleConnections()

// take the time before starting the HTTP task
t0 := time.Now()

// fetch the webpage following redirects
resp, err := clnt.Do(req)

// publish the elapsed time required for measuring HTTP
elapsed := time.Since(t0)
metricHTTPTaskDurationSeconds.Observe(elapsed.Seconds())

// handle the case of failure
if err != nil {
// fix: emit -1 like the old test helper does
config.Out <- ctrlHTTPResponse{
Expand All @@ -100,20 +121,29 @@ func httpDo(ctx context.Context, config *httpConfig) {
ol.Stop(err)
return
}

// make sure we eventually close the body
defer resp.Body.Close()

// copy headers
headers := make(map[string]string)
for k := range resp.Header {
headers[k] = resp.Header.Get(k)
}

// read the body up within a given maximum limit
// TODO(bassosimone): do we need to compute whether the body was truncated?
reader := &io.LimitedReader{R: resp.Body, N: config.MaxAcceptableBody}
data, err := netxlite.ReadAllContext(ctx, reader)
ol.Stop(err)

// optionally check whether there's an HTTP3 endpoint
h3Endpoint := ""
if config.searchForH3 {
h3Endpoint = discoverH3Endpoint(resp, req)
}

// we're good and we can emit a final response now
config.Out <- ctrlHTTPResponse{
BodyLength: int64(len(data)),
DiscoveredH3Endpoint: h3Endpoint,
Expand Down
59 changes: 46 additions & 13 deletions internal/oohelperd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

// metricsSummaryObjectives returns the summary objectives for promauto.NewSummary.
func metricsSummaryObjectives() map[float64]float64 {
// See https://grafana.com/blog/2022/03/01/how-summary-metrics-work-in-prometheus/
//
// TODO(bassosimone,FedericoCeratto): investigate whether using
// a shorter-than-10m observation interval is better for us
return map[float64]float64{
0.25: 0.010, // 0.240 <= φ <= 0.260
0.5: 0.010, // 0.490 <= φ <= 0.510
0.75: 0.010, // 0.740 <= φ <= 0.760
0.9: 0.010, // 0.899 <= φ <= 0.901
0.99: 0.001, // 0.989 <= φ <= 0.991
}
}

var (
// metricRequestsCount counts the number of requests we served.
metricRequestsCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -26,18 +41,36 @@ var (

// metricWCTaskDurationSeconds summarizes the duration of the web connectivity measurement task.
metricWCTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "oohelperd_wctask_duration_seconds",
Help: "Summarizes the time to complete the Web Connectivity measurement task (in seconds)",
// See https://grafana.com/blog/2022/03/01/how-summary-metrics-work-in-prometheus/
//
// TODO(bassosimone,FedericoCeratto): investigate whether using
// a shorter-than-10m observation interval is better for us
Objectives: map[float64]float64{
0.25: 0.010, // 0.240 <= φ <= 0.260
0.5: 0.010, // 0.490 <= φ <= 0.510
0.75: 0.010, // 0.740 <= φ <= 0.760
0.9: 0.010, // 0.899 <= φ <= 0.901
0.99: 0.001, // 0.989 <= φ <= 0.991
},
Name: "oohelperd_wctask_duration_seconds",
Help: "Summarizes the time to complete the Web Connectivity measurement task (in seconds)",
Objectives: metricsSummaryObjectives(),
})

// metricDNSTaskDurationSeconds summarizes the duration of the DNS task.
metricDNSTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "oohelperd_dnstask_duration_seconds",
Help: "Summarizes the time to complete the DNS measurement task (in seconds)",
Objectives: metricsSummaryObjectives(),
})

// metricTCPTaskDurationSeconds summarizes the duration of the TCP task.
metricTCPTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "oohelperd_tcptask_duration_seconds",
Help: "Summarizes the time to complete the TCP measurement task (in seconds)",
Objectives: metricsSummaryObjectives(),
})

// metricTLSTaskDurationSeconds summarizes the duration of the TLS task.
metricTLSTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "oohelperd_tlstask_duration_seconds",
Help: "Summarizes the time to complete the TLS measurement task (in seconds)",
Objectives: metricsSummaryObjectives(),
})

// metricHTTPTaskDurationSeconds summarizes the duration of the HTTP task.
metricHTTPTaskDurationSeconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "oohelperd_httptask_duration_seconds",
Help: "Summarizes the time to complete the HTTP measurement task (in seconds)",
Objectives: metricsSummaryObjectives(),
})
)
Loading

0 comments on commit b51640c

Please sign in to comment.