Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: add e2e acking
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed May 9, 2024
1 parent 6cb0d1d commit ef8bd51
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
56 changes: 53 additions & 3 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package http_endpoint

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -43,6 +45,8 @@ var (
)

type handler struct {
ctx context.Context

metrics *inputMetrics
publish func(beat.Event)
log *logp.Logger
Expand All @@ -69,10 +73,22 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

wait := getTimeoutWait(r.URL, h.log)
var (
acked chan struct{}
timeout *time.Timer
)
if wait != 0 {
acked = make(chan struct{})
timeout = time.NewTimer(wait)
}
start := time.Now()
acker := newBatchACKTracker(func() {
h.metrics.batchACKTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesACKedTotal.Inc()
if acked != nil {
close(acked)
}
})
defer acker.Ready()
h.metrics.batchesReceived.Add(1)
Expand Down Expand Up @@ -138,14 +154,48 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
if acked == nil {
h.sendResponse(w, respCode, respBody)
} else {
select {
case <-acked:
if !timeout.Stop() {
<-timeout.C
}
h.sendResponse(w, respCode, respBody)
case <-timeout.C:
h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
case <-h.ctx.Done():
h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
}
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
}
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

var errTookTooLong = errors.New("could not publish event within timeout")

func getTimeoutWait(u *url.URL, log *logp.Logger) time.Duration {
p := u.Query().Get("wait_for_completion_timeout")
if p == "" {
return 0
}
log.Debugw("wait_for_completion_timeout parameter", "value", p)
t, err := time.ParseDuration(p)
if err != nil {
log.Warnw("could not parse wait_for_completion_timeout parameter", "error", err)
return 0
}
if t < 0 {
log.Warnw("negative wait_for_completion_timeout parameter", "error", err)
return 0
}
return t
}

func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *server) getErr() error {

func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event), log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
ctx: ctx,
log: log,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
Expand Down

0 comments on commit ef8bd51

Please sign in to comment.