Skip to content

Commit

Permalink
[Filebeat] gcp-pubsub: Restart Pub/Sub client on errors (#32712)
Browse files Browse the repository at this point in the history
This modifies the gcp-pubsub input in Filebeat to include its own retry loop
rather than being entirely dependent upon the pub/sub client SDK to retry requests.

The input will only exit when the shutdown is triggered by Filebeat. Any errors
generated by the pub/sub client will be logged and then the input will restart the
pub/sub client. It will throttle restarts to once per 30s.

Fixes #32550

(cherry picked from commit 947c837)
  • Loading branch information
andrewkroh authored and mergify[bot] committed Aug 17, 2022
1 parent f2c09e2 commit 04eef6d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Fix counter for number of events published in `httpjson` input. {pull}31993[31993]
- Fix handling of Checkpoint event for R81. {issue}32380[32380] {pull}32458[32458]
- Fix a hang on `apt-get update` stage in packaging. {pull}32580[32580]
- gcp-pubsub input: Restart Pub/Sub client on all errors. {issue}32550[32550] {pull}32712[32712]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcppubsub/_meta/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ RUN \
RUN \
mkdir /data

HEALTHCHECK --interval=1s --retries=90 CMD curl -s -f http://localhost:8432/
HEALTHCHECK --interval=1s --retries=90 CMD curl -s -f --http2 http://localhost:8432/

CMD gcloud beta emulators pubsub start --data-dir /data --host-port "0.0.0.0:8432"
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
variants:
- SDK_VERSION: 293.0.0-0
- SDK_VERSION: 398.0.0-0
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/gcppubsub/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ version: '2.3'

services:
googlepubsub:
image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-293.0.0-0}-1
image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-398.0.0-0}-1
build:
context: ./_meta
args:
SDK_VERSION: ${SDK_VERSION:-293.0.0-0}
SDK_VERSION: ${SDK_VERSION:-398.0.0-0}
ports:
- 8432
- '127.0.0.1:8432:8432'
29 changes: 26 additions & 3 deletions x-pack/filebeat/input/gcppubsub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"golang.org/x/time/rate"
"google.golang.org/api/option"
"google.golang.org/grpc"

Expand All @@ -32,6 +33,9 @@ import (
const (
inputName = "gcp-pubsub"
oldInputName = "google-pubsub"

// retryInterval is the minimum duration between pub/sub client retries.
retryInterval = 30 * time.Second
)

func init() {
Expand Down Expand Up @@ -139,9 +143,28 @@ func (in *pubsubInput) Run() {
defer in.log.Info("Pub/Sub input worker has stopped.")
defer in.workerWg.Done()
defer in.workerCancel()
if err := in.run(); err != nil {
in.log.Error(err)
return

// Throttle pubsub client restarts.
rt := rate.NewLimiter(rate.Every(retryInterval), 1)

// Watchdog to keep the worker operating after an error.
for in.workerCtx.Err() == nil {
// Rate limit.
if err := rt.Wait(in.workerCtx); err != nil {
continue
}

if err := in.run(); err != nil {
if in.workerCtx.Err() == nil {
in.log.Warnw("Restarting failed Pub/Sub input worker.", "error", err)
continue
}

// Log any non-cancellation error before stopping.
if !errors.Is(err, context.Canceled) {
in.log.Errorw("Pub/Sub input worker failed.", "error", err)
}
}
}
}()
})
Expand Down

0 comments on commit 04eef6d

Please sign in to comment.