Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: add input metrics (#36427)
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 authored Sep 12, 2023
1 parent fb60f1a commit c5c7980
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- For request tracer logging in CEL and httpjson the request and response body are no longer included in `event.original`. The body is still present in `http.{request,response}.body.content`. {pull}36531[36531]
- Added support for Okta OAuth2 provider in the CEL input. {issue}36336[36336] {pull}36521[36521]
- Improve error logging in HTTPJSON input. {pull}36529[36529]
- Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427]

*Auditbeat*

Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,28 @@ This option defines the provider of the webhook that uses CRC (Challenge-Respons

The secret token provided by the webhook owner for the CRC validation. It is required when a `crc.provider` is set.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `bind_address` | Bind address of input.
| `route` | HTTP request route of the input.
| `is_tls_connection` | Whether the input is listening on a TLS connection.
| `api_errors_total` | Number of API errors.
| `batches_received_total` | Number of event arrays received.
| `batches_published_total` | Number of event arrays published.
| `events_published_total` | Number of events published.
| `size` | Histogram of request content lengths.
| `batch_size` | Histogram of the received event array length.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
|=======

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher
metrics *inputMetrics

messageField string
responseCode int
Expand All @@ -43,16 +44,21 @@ type httpHandler struct {

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()

objs, _, status, err := httpReadJSON(body)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}

Expand All @@ -66,6 +72,7 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
respBody string
)

h.metrics.batchSize.Update(int64(len(objs)))
for _, obj := range objs {
var err error
if h.crc != nil {
Expand All @@ -74,19 +81,24 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
// CRC request processed
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
return
}
}

if err = h.publishEvent(obj, headers); err != nil {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func Test_apiResponse(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Setup
pub := new(publisher)
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"))
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
106 changes: 98 additions & 8 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"reflect"
"sync"
"time"

"github.com/rcrowley/go-metrics"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -87,7 +93,9 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error {
}

func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error {
err := servers.serve(ctx, e, publisher)
metrics := newInputMetrics(ctx.ID)
defer metrics.Close()
err := servers.serve(ctx, e, publisher, metrics)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("unable to start server due to error: %w", err)
}
Expand All @@ -109,11 +117,17 @@ type pool struct {
// cancelled or the context of another end-point sharing the same address
// has had its context cancelled. If an end-point is re-registered with
// the same address and mux pattern, serve will return an error.
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error {
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error {
log := ctx.Logger.With("address", e.addr)
pattern := e.config.URL

var err error
u, err := url.Parse(pattern)
if err != nil {
return err
}
metrics.route.Set(u.Path)
metrics.isTLS.Set(e.tlsConfig != nil)

p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
Expand All @@ -132,15 +146,15 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(e.config, pub, log))
s.mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
return s.getErr()
}

mux := http.NewServeMux()
mux.Handle(pattern, newHandler(e.config, pub, log))
mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux, ReadHeaderTimeout: 5 * time.Second}
s = &server{
idOf: map[string]string{pattern: ctx.ID},
Expand All @@ -156,10 +170,10 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern)
// The certificate is already loaded so we do not need
// to pass the cert file and key file parameters.
err = s.srv.ListenAndServeTLS("", "")
err = listenAndServeTLS(s.srv, "", "", metrics)
} else {
log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern)
err = s.srv.ListenAndServe()
err = listenAndServe(s.srv, metrics)
}
p.mu.Lock()
delete(p.servers, e.addr)
Expand All @@ -169,6 +183,36 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
return err
}

func listenAndServeTLS(srv *http.Server, certFile, keyFile string, metrics *inputMetrics) error {
addr := srv.Addr
if addr == "" {
addr = ":https"
}

ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
metrics.bindAddr.Set(ln.Addr().String())

defer ln.Close()

return srv.ServeTLS(ln, certFile, keyFile)
}

func listenAndServe(srv *http.Server, metrics *inputMetrics) error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
metrics.bindAddr.Set(ln.Addr().String())
return srv.Serve(ln)
}

func checkTLSConsistency(addr string, old, new *tlscommon.ServerConfig) error {
if old == nil && new == nil {
return nil
Expand Down Expand Up @@ -240,7 +284,7 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler {
func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
validator := &apiValidator{
basicAuth: c.BasicAuth,
username: c.Username,
Expand All @@ -258,6 +302,7 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle
handler := &httpHandler{
log: log,
publisher: pub,
metrics: metrics,
messageField: c.Prefix,
responseCode: c.ResponseCode,
responseBody: c.ResponseBody,
Expand All @@ -268,3 +313,48 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle

return newAPIValidationHandler(http.HandlerFunc(handler.apiResponse), validator, log)
}

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()

bindAddr *monitoring.String // bind address of input
route *monitoring.String // request route
isTLS *monitoring.Bool // whether the input is listening on a TLS connection
apiErrors *monitoring.Uint // number of API errors
batchesReceived *monitoring.Uint // number of event arrays received
batchesPublished *monitoring.Uint // number of event arrays published
eventsPublished *monitoring.Uint // number of events published
contentLength metrics.Sample // histogram of request content lengths.
batchSize metrics.Sample // histogram of the received batch sizes.
batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches).
}

func newInputMetrics(id string) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &inputMetrics{
unregister: unreg,
bindAddr: monitoring.NewString(reg, "bind_address"),
route: monitoring.NewString(reg, "route"),
isTLS: monitoring.NewBool(reg, "is_tls_connection"),
apiErrors: monitoring.NewUint(reg, "api_errors_total"),
batchesReceived: monitoring.NewUint(reg, "batches_received_total"),
batchesPublished: monitoring.NewUint(reg, "batches_published_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
contentLength: metrics.NewUniformSample(1024),
batchSize: metrics.NewUniformSample(1024),
batchProcessingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.contentLength))
_ = adapter.NewGoMetrics(reg, "batch_size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchSize))
_ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchProcessingTime))

return out
}

func (m *inputMetrics) Close() {
m.unregister()
}
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,15 @@ func TestServerPool(t *testing.T) {
fails = make(chan error, 1)
)
ctx, cancel := newCtx("server_pool_test", test.name)
metrics := newInputMetrics("")
defer metrics.Close()
var wg sync.WaitGroup
for _, cfg := range test.cfgs {
cfg := cfg
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub)
err := servers.serve(ctx, cfg, &pub, metrics)
if err != http.ErrServerClosed {
select {
case fails <- err:
Expand Down Expand Up @@ -274,7 +276,7 @@ func TestServerPool(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub)
err := servers.serve(ctx, cfg, &pub, metrics)
if err != nil && err != http.ErrServerClosed && test.wantErr == nil {
t.Errorf("failed to re-register %v: %v", cfg.addr, err)
}
Expand Down

0 comments on commit c5c7980

Please sign in to comment.