Skip to content

Commit

Permalink
[filebeat][streaming] - Added retry functionality to websocket connec…
Browse files Browse the repository at this point in the history
…tions (#40601)

* added websocket retry logic, added input & config tests and updated docs

* updated changelog

* fixed function name spelling error

* added a retryable error check

* addressed PR comments

* passed metrics to handleConnectionResponse to track errors

* addressed PR suggestions

* updated retry dial signature

(cherry picked from commit 0c3c9c6)

# Conflicts:
#	x-pack/filebeat/input/streaming/websocket.go
  • Loading branch information
ShourieG authored and mergify[bot] committed Dec 12, 2024
1 parent b9b912d commit e95361d
Show file tree
Hide file tree
Showing 6 changed files with 503 additions and 31 deletions.
30 changes: 30 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,36 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Parse more fields from Elasticsearch slowlogs {pull}38295[38295]
- added benchmark input {pull}37437[37437]
- added benchmark input and discard output {pull}37437[37437]
- Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329]
- Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460]
- Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455]
- Implement Elastic Agent status and health reporting for CEL Filebeat input. {pull}39209[39209]
- Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503]
- Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587]
- Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}39588[39588]
- Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add support for base64-encoded HMAC headers to HTTP Endpoint. {pull}39655[39655]
- Add user group membership support to Okta entity analytics provider. {issue}39814[39814] {pull}39815[39815]
- Add request trace support for Okta and EntraID entity analytics providers. {pull}39821[39821]
- Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]
- Add ability to remove request trace logs from CEL input. {pull}39969[39969]
- Add ability to remove request trace logs from HTTPJSON input. {pull}40003[40003]
- Update CEL mito extensions to v1.13.0. {pull}40035[40035]
- Add Jamf entity analytics provider. {pull}39996[39996]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add SSL and username support for Redis input, now the input includes support for Redis 6.0+. {pull}40111[40111]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]
- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601]

*Auditbeat*

Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,43 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[[retry-streaming]]
[float]
==== `retry`

The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
url: ws://localhost:443/_stream
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
retry:
max_attempts: 5
wait_min: 1s
wait_max: 10s
----
[float]
==== `retry.max_attempts`

The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted.

[float]
==== `retry.wait_min`

The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, `wait_min` might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying.

[float]
==== `retry.wait_max`

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

[float]
=== Metrics

Expand Down
267 changes: 267 additions & 0 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package streaming

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"math/rand/v2"
"net"
"net/http"
"strings"
"time"

"github.com/gorilla/websocket"
"go.uber.org/zap/zapcore"

inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/elastic-agent-libs/logp"
)

type websocketStream struct {
processor

id string
cfg config
cursor map[string]any

time func() time.Time
}

// NewWebsocketFollower performs environment construction including CEL
// program and regexp compilation, and input metrics set-up for a websocket
// stream follower.
func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map[string]any, pub inputcursor.Publisher, log *logp.Logger, now func() time.Time) (StreamFollower, error) {
s := websocketStream{
id: id,
cfg: cfg,
cursor: cursor,
processor: processor{
ns: "websocket",
pub: pub,
log: log,
redact: cfg.Redact,
metrics: newInputMetrics(id),
},
}
s.metrics.url.Set(cfg.URL.String())
s.metrics.errorsTotal.Set(0)

patterns, err := regexpsFromConfig(cfg)
if err != nil {
s.metrics.errorsTotal.Inc()
s.Close()
return nil, err
}

s.prg, s.ast, err = newProgram(ctx, cfg.Program, root, patterns, log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.Close()
return nil, err
}

return &s, nil
}

// FollowStream receives, processes and publishes events from the subscribed
// websocket stream.
func (s *websocketStream) FollowStream(ctx context.Context) error {
state := s.cfg.State
if state == nil {
state = make(map[string]any)
}
if s.cursor != nil {
state["cursor"] = s.cursor
}

// initialize the input url with the help of the url_program.
url, err := getURL(ctx, "websocket", s.cfg.URLProgram, s.cfg.URL.String(), state, s.cfg.Redact, s.log, s.now)
if err != nil {
s.metrics.errorsTotal.Inc()
return err
}

// websocket client
c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log)
handleConnectionResponse(resp, s.metrics, s.log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to establish websocket connection", "error", err)
return err
}

// ensures this is the last connection closed when the function returns
defer func() {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
}()

for {
select {
case <-ctx.Done():
s.log.Debugw("context cancelled, closing websocket connection")
return ctx.Err()
default:
_, message, err := c.ReadMessage()
if err != nil {
s.metrics.errorsTotal.Inc()
if isRetryableError(err) {
s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err)
// close the old connection and reconnect
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
// since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it
c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log)
handleConnectionResponse(resp, s.metrics, s.log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to reconnect websocket connection", "error", err)
return err
}
} else {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
}
}
}

// isRetryableError checks if the error is retryable based on the error type.
func isRetryableError(err error) bool {
// check for specific network errors
var netErr *net.OpError
if errors.As(err, &netErr) {
switch {
case netErr.Op == "dial" && netErr.Err.Error() == "i/o timeout",
netErr.Op == "read" && netErr.Err.Error() == "i/o timeout",
netErr.Op == "read" && netErr.Err.Error() == "connection reset by peer",
netErr.Op == "read" && netErr.Err.Error() == "connection refused",
netErr.Op == "read" && netErr.Err.Error() == "connection reset",
netErr.Op == "read" && netErr.Err.Error() == "connection closed":
return true
}
}

// check for specific websocket close errors
var closeErr *websocket.CloseError
if errors.As(err, &closeErr) {
switch closeErr.Code {
case websocket.CloseGoingAway,
websocket.CloseNormalClosure,
websocket.CloseInternalServerErr,
websocket.CloseTryAgainLater,
websocket.CloseServiceRestart,
websocket.CloseTLSHandshake:
return true
}
}

// check for common error patterns
if strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "connection reset") ||
strings.Contains(err.Error(), "temporary failure") ||
strings.Contains(err.Error(), "server is busy") {
return true
}

return false
}

// handleConnectionResponse logs the response body of the websocket connection.
func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *logp.Logger) {
if resp != nil && resp.Body != nil {
var buf bytes.Buffer
defer resp.Body.Close()

if log.Core().Enabled(zapcore.DebugLevel) {
const limit = 1e4
if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) {
metrics.errorsTotal.Inc()
fmt.Fprintf(&buf, "failed to read websocket response body with error: (%s) \n", err)
}
}

// discard the remaining part of the body and check for truncation.
if n, err := io.Copy(io.Discard, resp.Body); err != nil {
metrics.errorsTotal.Inc()
fmt.Fprintf(&buf, "failed to discard remaining response body with error: (%s) ", err)
} else if n != 0 && buf.Len() != 0 {
buf.WriteString("... truncated")
}

log.Debugw("websocket connection response", "body", &buf)
}
}

// connectWebSocket attempts to connect to the websocket server with exponential backoff if retry config is available else it connects without retry.
func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Logger) (*websocket.Conn, *http.Response, error) {
var conn *websocket.Conn
var response *http.Response
var err error
headers := formHeader(cfg)

if cfg.Retry != nil {
retryConfig := cfg.Retry
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
log.Debugw("attempt %d: webSocket connection failed. retrying...\n", attempt)
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err)
}

return websocket.DefaultDialer.DialContext(ctx, url, headers)
}

// calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm.
func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duration {
// calculate exponential backoff
base := float64(waitMin)
backoff := base * math.Pow(2, float64(attempt-1))

// calculate jitter proportional to the backoff
maxJitter := float64(waitMax-waitMin) * math.Pow(2, float64(attempt-1))
jitter := rand.Float64() * maxJitter

waitTime := time.Duration(backoff + jitter)

return waitTime
}

// now is time.Now with a modifiable time source.
func (s *websocketStream) now() time.Time {
if s.time == nil {
return time.Now()
}
return s.time()
}

func (s *websocketStream) Close() error {
s.metrics.Close()
return nil
}
19 changes: 19 additions & 0 deletions x-pack/filebeat/input/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package websocket

import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -32,6 +34,8 @@ type config struct {
URL *urlConfig `config:"url" validate:"required"`
// Redact is the debug log state redaction configuration.
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`
}

type redact struct {
Expand All @@ -43,6 +47,12 @@ type redact struct {
Delete bool `config:"delete"`
}

type retry struct {
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
}

type authConfig struct {
// Custom auth config to use for authentication.
CustomAuth *customAuthConfig `config:"custom"`
Expand Down Expand Up @@ -94,6 +104,15 @@ func (c config) Validate() error {
if err != nil {
return err
}

if c.Retry != nil {
switch {
case c.Retry.MaxAttempts <= 0:
return errors.New("max_attempts must be greater than zero")
case c.Retry.WaitMin > c.Retry.WaitMax:
return errors.New("wait_min must be less than or equal to wait_max")
}
}
return nil
}

Expand Down
Loading

0 comments on commit e95361d

Please sign in to comment.