From e2eb867df08edccee7f880a20184861da5c91e41 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 30 Sep 2020 15:10:38 +0200 Subject: [PATCH] Cherry-pick #21258 to 7.9: o365input: Restart after fatal error (#21386) Update the o365input to restart the input after a fatal error is encountered, for example an authentication token refresh error or a parsing error. This enables the input to be more resilient against transient errors. Before this patch, the input would index an error document and terminate. Now it will index an error and restart after a fixed timeout of 5 minutes. (cherry picked from commit c723c1e51c42fa7e77724f84d561f1cef216ce3c) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/o365audit/input.go | 44 +++++++++++++++++------- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 092d20286561..d85eaf585d48 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `setup.dashboards.index` setting not working. {pull}17749[17749] - Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030] - Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] +- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21258[21258] *Auditbeat* diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 1ced85ce3370..1a97768c1562 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -26,6 +26,9 @@ import ( const ( pluginName = "o365audit" fieldsPrefix = pluginName + + // How long to retry when a fatal error is encountered in the input. + failureRetryInterval = time.Minute * 5 ) type o365input struct { @@ -107,6 +110,34 @@ func (inp *o365input) Run( src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher, +) error { + for ctx.Cancelation.Err() == nil { + err := inp.runOnce(ctx, src, cursor, publisher) + if err == nil { + break + } + if ctx.Cancelation.Err() != err && err != context.Canceled { + msg := common.MapStr{} + msg.Put("error.message", err.Error()) + msg.Put("event.kind", "pipeline_error") + event := beat.Event{ + Timestamp: time.Now(), + Fields: msg, + } + publisher.Publish(event, nil) + ctx.Logger.Errorf("Input failed: %v", err) + ctx.Logger.Infof("Restarting in %v", failureRetryInterval) + time.Sleep(failureRetryInterval) + } + } + return nil +} + +func (inp *o365input) runOnce( + ctx v2.Context, + src cursor.Source, + cursor cursor.Cursor, + publisher cursor.Publisher, ) error { stream := src.(*stream) tenantID, contentType := stream.tenantID, stream.contentType @@ -156,18 +187,7 @@ func (inp *o365input) Run( } log.Infow("Start fetching events", "cursor", start) - err = poller.Run(action) - if err != nil && ctx.Cancelation.Err() != err && err != context.Canceled { - msg := common.MapStr{} - msg.Put("error.message", err.Error()) - msg.Put("event.kind", "pipeline_error") - event := beat.Event{ - Timestamp: time.Now(), - Fields: msg, - } - publisher.Publish(event, nil) - } - return err + return poller.Run(action) } func initCheckpoint(log *logp.Logger, c cursor.Cursor, maxRetention time.Duration) checkpoint {