diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e63eabc3f1d..96462205dc8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -144,6 +144,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861] - Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] - Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] +- Fix request trace filename handling in http_endpoint input. {pull}39410[39410] *Heartbeat* @@ -273,6 +274,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587] - Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}[] - Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}39588[39588] +- Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index a669eae489a..bc0ec78cdf8 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -9,13 +9,12 @@ HTTP Endpoint ++++ -beta[] - The HTTP Endpoint input initializes a listening HTTP server that collects -incoming HTTP POST requests containing a JSON body. The body must be either an -object or an array of objects. Any other data types will result in an HTTP 400 -(Bad Request) response. For arrays, one document is created for each object in -the array. +incoming HTTP POST requests containing a JSON body. The body must be either +an object or an array of objects, otherwise a Common Expression Language +expression that converts the the JSON body to these types can be provided. +Any other data types will result in an HTTP 400 (Bad Request) response. For +arrays, one document is created for each object in the array. gzip encoded request bodies are supported if a `Content-Encoding: gzip` header is sent with the request. @@ -35,14 +34,27 @@ These are the possible response codes from the server. |========================================================================================================================================================= | HTTP Response Code | Name | Reason | 200 | OK | Returned on success. -| 400 | Bad Request | Returned if JSON body decoding fails. +| 400 | Bad Request | Returned if JSON body decoding fails or if `wait_for_completion_timeout` query validation fails. | 401 | Unauthorized | Returned when basic auth, secret header, or HMAC validation fails. | 405 | Method Not Allowed | Returned if methods other than POST are used. | 406 | Not Acceptable | Returned if the POST request does not contain a body. | 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip. | 500 | Internal Server Error | Returned if an I/O error occurs reading the request. +| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout. |========================================================================================================================================================= +The endpoint will enforce end-to-end ACK when a URL query parameter +`wait_for_completion_timeout` with a duration is provided. For example +`http://localhost:8080/?wait_for_completion_timeout=1m` will wait up +to 1 minute for the event to be published to the cluster and then return +the user-defined response message. In the case that the publication +does not complete within the timeout duration, the HTTP response will +have a 504 Gateway Timeout status code. The syntax for durations is +a number followed by units which may be h, m and s. No other HTTP query +is accepted. If another query parameter is provided or duration syntax +is incorrect, the request will fail with an HTTP 400 "Bad Request" +status. + Example configurations: Basic example: @@ -69,6 +81,17 @@ Custom response example: prefix: "json" ---- +Map request to root of document example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + prefix: "." +---- + Multiple endpoints example: ["source","yaml",subs="attributes"] ---- @@ -171,6 +194,40 @@ Preserving original event and including headers in document include_headers: ["TestHeader"] ---- +Common Expression Language example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + program: | + obj.records.map(r, { + "requestId": obj.requestId, + "timestamp": string(obj.timestamp), + "event": r, + }) +---- +This example would allow handling of a JSON body that is an object containing +more than one event that each should be ingested as separate documents with +the common timestamp and request ID: +["source","json",subs="attributes"] +---- +{ + "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", + "timestamp": 1578090901599, + "records": [ + { + "data": "event record 1" + }, + { + "data": "event record 2" + } + ] +} +---- + ==== Configuration options The `http_endpoint` input supports the following configuration options plus the @@ -230,7 +287,7 @@ In certain scenarios when the source of the request is not able to do that, it c [float] ==== `program` -The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported. +The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a https://opensource.google.com/projects/cel[Common Expression Language (CEL)] program can be provided through this configuration field. The name of the object in the CEL program is `obj`. No CEL extensions are provided beyond the function in the CEL https://github.com/google/cel-spec/blob/master/doc/langdef.md#standard[standard library]. CEL https://pkg.go.dev/github.com/google/cel-go/cel#OptionalTypes[optional types] are supported. [float] ==== `response_code` @@ -260,7 +317,7 @@ This options specific which URL path to accept requests on. Defaults to `/` [float] ==== `prefix` -This option specifies which prefix the incoming request will be mapped to. +This option specifies which prefix the incoming request will be mapped to. If `prefix` is "`.`", the request will be mapped to the root of the resulting document. [float] ==== `include_headers` @@ -346,10 +403,12 @@ observe the activity of the input. | `api_errors_total` | Number of API errors. | `batches_received_total` | Number of event arrays received. | `batches_published_total` | Number of event arrays published. +| `batches_acked_total` | Number of event arrays ACKed. | `events_published_total` | Number of events published. | `size` | Histogram of request content lengths. | `batch_size` | Histogram of the received event array length. | `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +| `batch_ack_time` | Histogram of the elapsed successful batch ACKing times in nanoseconds (time of handler start to time of ACK for non-empty batches). |======= [id="{beatname_lc}-input-{type}-common-options"] diff --git a/x-pack/filebeat/input/http_endpoint/ack.go b/x-pack/filebeat/input/http_endpoint/ack.go new file mode 100644 index 00000000000..9dfc5a656b5 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/ack.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" +) + +// newEventACKHandler returns a beat ACKer that can receive callbacks when +// an event has been ACKed an output. If the event contains a private metadata +// pointing to a batchACKTracker then it will invoke the tracker's ACK() method +// to decrement the number of pending ACKs. +func newEventACKHandler() beat.EventListener { + return acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, private := range privates { + if ack, ok := private.(*batchACKTracker); ok { + ack.ACK() + } + } + }), + ) +} + +// batchACKTracker invokes batchACK when all events associated to the batch +// have been published and acknowledged by an output. +type batchACKTracker struct { + batchACK func() + + mu sync.Mutex + pending int64 +} + +// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function +// is invoked after the full batch has been acknowledged. Ready() must be invoked +// after all events in the batch are published. +func newBatchACKTracker(fn func()) *batchACKTracker { + return &batchACKTracker{ + batchACK: fn, + pending: 1, // Ready() must be called to consume this "1". + } +} + +// Ready signals that the batch has been fully consumed. Only +// after the batch is marked as "ready" can the batch be ACKed. +// This prevents the batch from being ACKed prematurely. +func (t *batchACKTracker) Ready() { + t.ACK() +} + +// Add increments the number of pending ACKs. +func (t *batchACKTracker) Add() { + t.mu.Lock() + t.pending++ + t.mu.Unlock() +} + +// ACK decrements the number of pending event ACKs. When all pending ACKs are +// received then the event batch is ACKed. +func (t *batchACKTracker) ACK() { + t.mu.Lock() + defer t.mu.Unlock() + + if t.pending <= 0 { + panic("misuse detected: negative ACK counter") + } + + t.pending-- + if t.pending == 0 { + t.batchACK() + } +} diff --git a/x-pack/filebeat/input/http_endpoint/ack_test.go b/x-pack/filebeat/input/http_endpoint/ack_test.go new file mode 100644 index 00000000000..59b67a39fb8 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/ack_test.go @@ -0,0 +1,50 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBatchACKTracker(t *testing.T) { + t.Run("empty", func(t *testing.T) { + tracker := make(ack) + + acker := newBatchACKTracker(tracker.ACK) + require.False(t, tracker.wasACKed()) + + acker.Ready() + require.True(t, tracker.wasACKed()) + }) + + t.Run("single_event", func(t *testing.T) { + tracker := make(ack) + + acker := newBatchACKTracker(tracker.ACK) + acker.Add() + acker.ACK() + require.False(t, tracker.wasACKed()) + + acker.Ready() + require.True(t, tracker.wasACKed()) + }) +} + +type ack chan struct{} + +func (a ack) ACK() { + close(a) +} + +func (a ack) wasACKed() bool { + select { + case <-a: + return true + default: + return false + } +} diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index d21ac145174..b799248a935 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -6,14 +6,18 @@ package http_endpoint import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io" "net" "net/http" + "net/url" "reflect" + "sort" "strconv" + "strings" "time" "github.com/google/cel-go/cel" @@ -25,7 +29,6 @@ import ( "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/structpb" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/jsontransform" @@ -44,8 +47,10 @@ var ( ) type handler struct { + ctx context.Context + metrics *inputMetrics - publisher stateless.Publisher + publish func(beat.Event) log *logp.Logger validator apiValidator txBaseID string // Random value to make transaction IDs unique. @@ -64,18 +69,40 @@ type handler struct { } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + txID := h.nextTxID() + h.log.Debugw("request", "url", r.URL, "tx_id", txID) status, err := h.validator.validateRequest(r) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) return } + wait, err := getTimeoutWait(r.URL, h.log) + if err != nil { + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err) + return + } + var ( + acked chan struct{} + timeout *time.Timer + ) + if wait != 0 { + acked = make(chan struct{}) + timeout = time.NewTimer(wait) + } start := time.Now() + acker := newBatchACKTracker(func() { + h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds()) + h.metrics.batchesACKedTotal.Inc() + if acked != nil { + close(acked) + } + }) h.metrics.batchesReceived.Add(1) h.metrics.contentLength.Update(r.ContentLength) body, status, err := getBodyReader(r) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) h.metrics.apiErrors.Add(1) return } @@ -94,7 +121,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { objs, _, status, err := httpReadJSON(body, h.program) if err != nil { - h.sendAPIErrorResponse(w, r, h.log, status, err) + h.sendAPIErrorResponse(txID, w, r, h.log, status, err) h.metrics.apiErrors.Add(1) return } @@ -119,29 +146,90 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { break } else if !errors.Is(err, errNotCRC) { h.metrics.apiErrors.Add(1) - h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err) return } } - if err = h.publishEvent(obj, headers); err != nil { + acker.Add() + if err = h.publishEvent(obj, headers, acker); err != nil { h.metrics.apiErrors.Add(1) - h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusInternalServerError, err) return } h.metrics.eventsPublished.Add(1) respCode, respBody = h.responseCode, h.responseBody } - h.sendResponse(w, respCode, respBody) - if h.reqLogger != nil { - h.logRequest(r, respCode, nil) + acker.Ready() + if acked == nil { + h.sendResponse(w, respCode, respBody) + } else { + select { + case <-acked: + h.log.Debugw("request acked", "tx_id", txID) + if !timeout.Stop() { + <-timeout.C + } + h.sendResponse(w, respCode, respBody) + case <-timeout.C: + h.log.Debugw("request timed out", "tx_id", txID) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, errTookTooLong) + case <-h.ctx.Done(): + h.log.Debugw("request context cancelled", "tx_id", txID) + h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err()) + } + if h.reqLogger != nil { + h.logRequest(txID, r, respCode, nil) + } } h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) h.metrics.batchesPublished.Add(1) } -func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { +var errTookTooLong = errors.New("could not publish event within timeout") + +func getTimeoutWait(u *url.URL, log *logp.Logger) (time.Duration, error) { + q := u.Query() + switch len(q) { + case 0: + return 0, nil + case 1: + if _, ok := q["wait_for_completion_timeout"]; !ok { + // Get the only key in q. We don't know what it is, so iterate + // over the first one of one. + var k string + for k = range q { + break + } + return 0, fmt.Errorf("unexpected URL query: %s", k) + } + default: + delete(q, "wait_for_completion_timeout") + keys := make([]string, 0, len(q)) + for k := range q { + keys = append(keys, k) + } + sort.Strings(keys) + return 0, fmt.Errorf("unexpected URL query: %s", strings.Join(keys, ", ")) + } + p := q.Get("wait_for_completion_timeout") + if p == "" { + // This will never happen; it is already handled in the check switch above. + return 0, nil + } + log.Debugw("wait_for_completion_timeout parameter", "value", p) + t, err := time.ParseDuration(p) + if err != nil { + return 0, fmt.Errorf("could not parse wait_for_completion_timeout parameter: %w", err) + } + if t < 0 { + return 0, fmt.Errorf("negative wait_for_completion_timeout parameter: %w", err) + } + return t, nil +} + +func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) { w.Header().Add("Content-Type", "application/json") w.WriteHeader(status) @@ -159,11 +247,11 @@ func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, l log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr) } if h.reqLogger != nil { - h.logRequest(r, status, buf.Bytes()) + h.logRequest(txID, r, status, buf.Bytes()) } } -func (h *handler) logRequest(r *http.Request, status int, respBody []byte) { +func (h *handler) logRequest(txID string, r *http.Request, status int, respBody []byte) { // Populate and preserve scheme and host if they are missing; // they are required for httputil.DumpRequestOut. var scheme, host string @@ -189,7 +277,6 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) { zap.ByteString("http.response.body.content", respBody), ) } - txID := h.nextTxID() h.log.Debugw("new request trace transaction", "id", txID) // Limit request logging body size to 10kiB. const maxBodyLen = 10 * (1 << 10) @@ -219,10 +306,17 @@ func (h *handler) sendResponse(w http.ResponseWriter, status int, message string } } -func (h *handler) publishEvent(obj, headers mapstr.M) error { +func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) error { event := beat.Event{ Timestamp: time.Now().UTC(), - Fields: mapstr.M{}, + Private: acker, + } + if h.messageField == "." { + event.Fields = obj + } else { + if _, err := event.PutValue(h.messageField, obj); err != nil { + return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err) + } } if h.preserveOriginalEvent { event.Fields["event"] = mapstr.M{ @@ -233,11 +327,7 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error { event.Fields["headers"] = headers } - if _, err := event.PutValue(h.messageField, obj); err != nil { - return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err) - } - - h.publisher.Publish(event) + h.publish(event) return nil } diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index cb911f8ab18..4c464a34f50 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -239,6 +239,26 @@ func Test_apiResponse(t *testing.T) { wantStatus: http.StatusOK, wantResponse: `{"message": "success"}`, }, + { + name: "single_event_root", + conf: func() config { + c := defaultConfig() + c.Prefix = "." + return c + }(), + request: func() *http.Request { + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`)) + req.Header.Set("Content-Type", "application/json") + return req + }(), + events: []mapstr.M{ + { + "id": int64(0), + }, + }, + wantStatus: http.StatusOK, + wantResponse: `{"message": "success"}`, + }, { name: "single_event_gzip", conf: defaultConfig(), @@ -378,7 +398,7 @@ func Test_apiResponse(t *testing.T) { pub := new(publisher) metrics := newInputMetrics("") defer metrics.Close() - apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics) + apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub.Publish, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index e9d9bfe7ba9..6737a9b9aa0 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -16,7 +16,9 @@ import ( "net" "net/http" "net/url" + "path/filepath" "reflect" + "strings" "sync" "time" @@ -27,7 +29,7 @@ import ( "go.uber.org/zap/zapcore" v2 "github.com/elastic/beats/v7/filebeat/input/v2" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" @@ -52,13 +54,13 @@ type httpEndpoint struct { func Plugin() v2.Plugin { return v2.Plugin{ Name: inputName, - Stability: feature.Beta, + Stability: feature.Stable, Deprecated: false, - Manager: stateless.NewInputManager(configure), + Manager: v2.ConfigureWith(configure), } } -func configure(cfg *conf.C) (stateless.Input, error) { +func configure(cfg *conf.C) (v2.Input, error) { conf := defaultConfig() if err := cfg.Unpack(&conf); err != nil { return nil, err @@ -100,16 +102,39 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error { return l.Close() } -func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error { +func (e *httpEndpoint) Run(ctx v2.Context, pipeline beat.Pipeline) error { metrics := newInputMetrics(ctx.ID) defer metrics.Close() - err := servers.serve(ctx, e, publisher, metrics) + + if e.config.Tracer != nil { + id := sanitizeFileName(ctx.ID) + e.config.Tracer.Filename = strings.ReplaceAll(e.config.Tracer.Filename, "*", id) + } + + client, err := pipeline.ConnectWith(beat.ClientConfig{ + EventListener: newEventACKHandler(), + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) + } + defer client.Close() + + err = servers.serve(ctx, e, client.Publish, metrics) if err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("unable to start server due to error: %w", err) } return nil } +// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. +// The request.tracer.filename may have ":" when a http_endpoint input has cursor config and +// the macOS Finder will treat this as path-separator and causes to show up strange filepaths. +func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) + name = filepath.Clean(name) + return strings.ReplaceAll(name, string(filepath.Separator), "_") +} + // servers is the package-level server pool. var servers = pool{servers: make(map[string]*server)} @@ -125,7 +150,7 @@ type pool struct { // cancelled or the context of another end-point sharing the same address // has had its context cancelled. If an end-point is re-registered with // the same address and mux pattern, serve will return an error. -func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error { +func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub func(beat.Event), metrics *inputMetrics) error { log := ctx.Logger.With("address", e.addr) pattern := e.config.URL @@ -300,14 +325,15 @@ func (s *server) getErr() error { return s.err } -func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler { +func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event), log *logp.Logger, metrics *inputMetrics) http.Handler { h := &handler{ + ctx: ctx, log: log, txBaseID: newID(), txIDCounter: atomic.NewUint64(0), - publisher: pub, - metrics: metrics, + publish: pub, + metrics: metrics, validator: apiValidator{ basicAuth: c.BasicAuth, username: c.Username, @@ -375,10 +401,12 @@ type inputMetrics struct { apiErrors *monitoring.Uint // number of API errors batchesReceived *monitoring.Uint // number of event arrays received batchesPublished *monitoring.Uint // number of event arrays published + batchesACKedTotal *monitoring.Uint // Number of event arrays ACKed. eventsPublished *monitoring.Uint // number of events published contentLength metrics.Sample // histogram of request content lengths. batchSize metrics.Sample // histogram of the received batch sizes. batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches). + batchACKTime metrics.Sample // histogram of the elapsed successful batch acking times in nanoseconds (time of handler start to time of ACK for non-empty batches). } func newInputMetrics(id string) *inputMetrics { @@ -391,10 +419,12 @@ func newInputMetrics(id string) *inputMetrics { apiErrors: monitoring.NewUint(reg, "api_errors_total"), batchesReceived: monitoring.NewUint(reg, "batches_received_total"), batchesPublished: monitoring.NewUint(reg, "batches_published_total"), + batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"), eventsPublished: monitoring.NewUint(reg, "events_published_total"), contentLength: metrics.NewUniformSample(1024), batchSize: metrics.NewUniformSample(1024), batchProcessingTime: metrics.NewUniformSample(1024), + batchACKTime: metrics.NewUniformSample(1024), } _ = adapter.NewGoMetrics(reg, "size", adapter.Accept). Register("histogram", metrics.NewHistogram(out.contentLength)) @@ -402,6 +432,8 @@ func newInputMetrics(id string) *inputMetrics { Register("histogram", metrics.NewHistogram(out.batchSize)) _ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) + _ = adapter.NewGoMetrics(reg, "batch_ack_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchACKTime)) return out } diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go index c7c1b89bf3a..3f530454e1d 100644 --- a/x-pack/filebeat/input/http_endpoint/input_test.go +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -280,7 +280,7 @@ func TestServerPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub, metrics) + err := servers.serve(ctx, cfg, pub.Publish, metrics) if err != http.ErrServerClosed { select { case fails <- err: @@ -331,7 +331,7 @@ func TestServerPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := servers.serve(ctx, cfg, &pub, metrics) + err := servers.serve(ctx, cfg, pub.Publish, metrics) if err != nil && err != http.ErrServerClosed && test.wantErr == nil { t.Errorf("failed to re-register %v: %v", cfg.addr, err) } diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go index 809a7e7d135..6e9e65cb695 100644 --- a/x-pack/filebeat/input/lumberjack/ack.go +++ b/x-pack/filebeat/input/lumberjack/ack.go @@ -62,7 +62,7 @@ func (t *batchACKTracker) ACK() { } // newEventACKHandler returns a beat ACKer that can receive callbacks when -// an event has been ACKed an output. If the event contains a private metadata +// an event has been ACKed by an output. If the event contains a private metadata // pointing to a batchACKTracker then it will invoke the tracker's ACK() method // to decrement the number of pending ACKs. func newEventACKHandler() beat.EventListener { diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 9a42896d4ae..761b5b78977 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -90,6 +90,72 @@ def test_http_endpoint_request(self): assert output[0]["input.type"] == "http_endpoint" assert output[0]["json.{}".format(self.prefix)] == message + def test_http_endpoint_request_acked(self): + """ + Test http_endpoint input with HTTP events requiring ACK. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains( + "Starting HTTP server on {}:{}".format(self.host, self.port))) + + message = "somerandommessage" + payload = {self.prefix: message} + query = {"wait_for_completion_timeout": "1m"} + headers = {"Content-Type": "application/json", + "Accept": "application/json"} + r = requests.post(self.url, params=query, headers=headers, data=json.dumps(payload)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + print("response:", r.status_code, r.text) + + assert r.text == '{"message": "success"}' + assert output[0]["input.type"] == "http_endpoint" + assert output[0]["json.{}".format(self.prefix)] == message + + def test_http_endpoint_cel_request(self): + """ + Test http_endpoint input with HTTP events using CEL. + """ + options = """ + content_type: application/x-ndjson + program: | + {{ + "testmessage": obj.testmessage+'_'+obj.testmessage, + }} +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains( + "Starting HTTP server on {}:{}".format(self.host, self.port))) + + N = 10 + message = "somerandommessage_{}" + payload = "\n".join( + [json.dumps({self.prefix: message.format(i)}) for i in range(N)]) + headers = {"Content-Type": "application/x-ndjson", + "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=payload) + + self.wait_until(lambda: self.output_count(lambda x: x == N)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + print("response:", r.status_code, r.text) + + assert r.text == '{"message": "success"}' + + assert len(output) == N + for i in range(N): + assert output[i]["input.type"] == "http_endpoint" + assert output[i]["json.{}".format( + self.prefix)] == message.format(i)+'_'+message.format(i) + def test_http_endpoint_request_multiple_documents(self): """ Test http_endpoint input with multiple documents on a single HTTP request.