From c5c79804f4638a44d5a7035d89b850d20effbb22 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Tue, 12 Sep 2023 12:59:20 +0930 Subject: [PATCH] x-pack/filebeat/input/http_endpoint: add input metrics (#36427) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-http-endpoint.asciidoc | 22 ++++ .../filebeat/input/http_endpoint/handler.go | 12 ++ .../input/http_endpoint/handler_test.go | 4 +- x-pack/filebeat/input/http_endpoint/input.go | 106 ++++++++++++++++-- .../input/http_endpoint/input_test.go | 6 +- 6 files changed, 140 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1a616d1d097..4eb9f5073da 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -225,6 +225,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - For request tracer logging in CEL and httpjson the request and response body are no longer included in `event.original`. The body is still present in `http.{request,response}.body.content`. {pull}36531[36531] - Added support for Okta OAuth2 provider in the CEL input. {issue}36336[36336] {pull}36521[36521] - Improve error logging in HTTPJSON input. {pull}36529[36529] +- Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 98bb76c125a..ed25c5f719d 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -279,6 +279,28 @@ This option defines the provider of the webhook that uses CRC (Challenge-Respons The secret token provided by the webhook owner for the CRC validation. It is required when a `crc.provider` is set. +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `bind_address` | Bind address of input. +| `route` | HTTP request route of the input. +| `is_tls_connection` | Whether the input is listening on a TLS connection. +| `api_errors_total` | Number of API errors. +| `batches_received_total` | Number of event arrays received. +| `batches_published_total` | Number of event arrays published. +| `events_published_total` | Number of events published. +| `size` | Histogram of request content lengths. +| `batch_size` | Histogram of the received event array length. +| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +|======= + [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 112eabb2c9c..af3c17fdd2a 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -32,6 +32,7 @@ var ( type httpHandler struct { log *logp.Logger publisher stateless.Publisher + metrics *inputMetrics messageField string responseCode int @@ -43,9 +44,13 @@ type httpHandler struct { // Triggers if middleware validation returns successful func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) { + start := time.Now() + h.metrics.batchesReceived.Add(1) + h.metrics.contentLength.Update(r.ContentLength) body, status, err := getBodyReader(r) if err != nil { sendAPIErrorResponse(w, r, h.log, status, err) + h.metrics.apiErrors.Add(1) return } defer body.Close() @@ -53,6 +58,7 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) { objs, _, status, err := httpReadJSON(body) if err != nil { sendAPIErrorResponse(w, r, h.log, status, err) + h.metrics.apiErrors.Add(1) return } @@ -66,6 +72,7 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) { respBody string ) + h.metrics.batchSize.Update(int64(len(objs))) for _, obj := range objs { var err error if h.crc != nil { @@ -74,19 +81,24 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) { // CRC request processed break } else if !errors.Is(err, errNotCRC) { + h.metrics.apiErrors.Add(1) sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err) return } } if err = h.publishEvent(obj, headers); err != nil { + h.metrics.apiErrors.Add(1) sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err) return } + h.metrics.eventsPublished.Add(1) respCode, respBody = h.responseCode, h.responseBody } h.sendResponse(w, respCode, respBody) + h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) + h.metrics.batchesPublished.Add(1) } func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) { diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index d5250881ae0..0095aec4f25 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -320,7 +320,9 @@ func Test_apiResponse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // Setup pub := new(publisher) - apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test")) + metrics := newInputMetrics("") + defer metrics.Close() + apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 572d45b579d..72454b3a81c 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -11,16 +11,22 @@ import ( "fmt" "net" "net/http" + "net/url" "reflect" "sync" "time" + "github.com/rcrowley/go-metrics" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/go-concert/ctxtool" ) @@ -87,7 +93,9 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error { } func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error { - err := servers.serve(ctx, e, publisher) + metrics := newInputMetrics(ctx.ID) + defer metrics.Close() + err := servers.serve(ctx, e, publisher, metrics) if err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("unable to start server due to error: %w", err) } @@ -109,11 +117,17 @@ type pool struct { // cancelled or the context of another end-point sharing the same address // has had its context cancelled. If an end-point is re-registered with // the same address and mux pattern, serve will return an error. -func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error { +func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error { log := ctx.Logger.With("address", e.addr) pattern := e.config.URL - var err error + u, err := url.Parse(pattern) + if err != nil { + return err + } + metrics.route.Set(u.Path) + metrics.isTLS.Set(e.tlsConfig != nil) + p.mu.Lock() s, ok := p.servers[e.addr] if ok { @@ -132,7 +146,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e return err } log.Infof("Adding %s end point to server on %s", pattern, e.addr) - s.mux.Handle(pattern, newHandler(e.config, pub, log)) + s.mux.Handle(pattern, newHandler(e.config, pub, log, metrics)) s.idOf[pattern] = ctx.ID p.mu.Unlock() <-s.ctx.Done() @@ -140,7 +154,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e } mux := http.NewServeMux() - mux.Handle(pattern, newHandler(e.config, pub, log)) + mux.Handle(pattern, newHandler(e.config, pub, log, metrics)) srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux, ReadHeaderTimeout: 5 * time.Second} s = &server{ idOf: map[string]string{pattern: ctx.ID}, @@ -156,10 +170,10 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern) // The certificate is already loaded so we do not need // to pass the cert file and key file parameters. - err = s.srv.ListenAndServeTLS("", "") + err = listenAndServeTLS(s.srv, "", "", metrics) } else { log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern) - err = s.srv.ListenAndServe() + err = listenAndServe(s.srv, metrics) } p.mu.Lock() delete(p.servers, e.addr) @@ -169,6 +183,36 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e return err } +func listenAndServeTLS(srv *http.Server, certFile, keyFile string, metrics *inputMetrics) error { + addr := srv.Addr + if addr == "" { + addr = ":https" + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + metrics.bindAddr.Set(ln.Addr().String()) + + defer ln.Close() + + return srv.ServeTLS(ln, certFile, keyFile) +} + +func listenAndServe(srv *http.Server, metrics *inputMetrics) error { + addr := srv.Addr + if addr == "" { + addr = ":http" + } + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + metrics.bindAddr.Set(ln.Addr().String()) + return srv.Serve(ln) +} + func checkTLSConsistency(addr string, old, new *tlscommon.ServerConfig) error { if old == nil && new == nil { return nil @@ -240,7 +284,7 @@ func (s *server) getErr() error { return s.err } -func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler { +func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler { validator := &apiValidator{ basicAuth: c.BasicAuth, username: c.Username, @@ -258,6 +302,7 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle handler := &httpHandler{ log: log, publisher: pub, + metrics: metrics, messageField: c.Prefix, responseCode: c.ResponseCode, responseBody: c.ResponseBody, @@ -268,3 +313,48 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle return newAPIValidationHandler(http.HandlerFunc(handler.apiResponse), validator, log) } + +// inputMetrics handles the input's metric reporting. +type inputMetrics struct { + unregister func() + + bindAddr *monitoring.String // bind address of input + route *monitoring.String // request route + isTLS *monitoring.Bool // whether the input is listening on a TLS connection + apiErrors *monitoring.Uint // number of API errors + batchesReceived *monitoring.Uint // number of event arrays received + batchesPublished *monitoring.Uint // number of event arrays published + eventsPublished *monitoring.Uint // number of events published + contentLength metrics.Sample // histogram of request content lengths. + batchSize metrics.Sample // histogram of the received batch sizes. + batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches). +} + +func newInputMetrics(id string) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &inputMetrics{ + unregister: unreg, + bindAddr: monitoring.NewString(reg, "bind_address"), + route: monitoring.NewString(reg, "route"), + isTLS: monitoring.NewBool(reg, "is_tls_connection"), + apiErrors: monitoring.NewUint(reg, "api_errors_total"), + batchesReceived: monitoring.NewUint(reg, "batches_received_total"), + batchesPublished: monitoring.NewUint(reg, "batches_published_total"), + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + contentLength: metrics.NewUniformSample(1024), + batchSize: metrics.NewUniformSample(1024), + batchProcessingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "size", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.contentLength)) + _ = adapter.NewGoMetrics(reg, "batch_size", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchSize)) + _ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) + + return out +} + +func (m *inputMetrics) Close() { + m.unregister() +} diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go index ecd99961aea..d172a0eed5f 100644 --- a/x-pack/filebeat/input/http_endpoint/input_test.go +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -217,13 +217,15 @@ func TestServerPool(t *testing.T) { fails = make(chan error, 1) ) ctx, cancel := newCtx("server_pool_test", test.name) + metrics := newInputMetrics("") + defer metrics.Close() var wg sync.WaitGroup for _, cfg := range test.cfgs { cfg := cfg wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub) + err := servers.serve(ctx, cfg, &pub, metrics) if err != http.ErrServerClosed { select { case fails <- err: @@ -274,7 +276,7 @@ func TestServerPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub) + err := servers.serve(ctx, cfg, &pub, metrics) if err != nil && err != http.ErrServerClosed && test.wantErr == nil { t.Errorf("failed to re-register %v: %v", cfg.addr, err) }