diff --git a/x-pack/filebeat/input/o365audit/config.go b/x-pack/filebeat/input/o365audit/config.go index 576ad07d3a6..95bfe3ddb26 100644 --- a/x-pack/filebeat/input/o365audit/config.go +++ b/x-pack/filebeat/input/o365audit/config.go @@ -83,6 +83,11 @@ type APIConfig struct { // MaxRequestsPerMinute sets the limit on the number of API requests that // can be sent, per tenant. MaxRequestsPerMinute int `config:"max_requests_per_minute" validate:"positive"` + + // SetIDFromAuditRecord controls whether the unique "Id" field in audit + // record is used as the document id for ingestion. This helps avoiding + // duplicates. + SetIDFromAuditRecord bool `config:"set_id_from_audit_record"` } func defaultConfig() Config { @@ -122,6 +127,8 @@ func defaultConfig() Config { // According to the docs this is the max requests that are allowed // per tenant per minute. MaxRequestsPerMinute: 2000, + + SetIDFromAuditRecord: true, }, } } diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index 8936535286e..73758a355b1 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -255,7 +255,7 @@ var errTerminated = errors.New("terminated due to output closed") // Report returns an action that produces a beat.Event from the given object. func (env apiEnvironment) Report(doc common.MapStr, private interface{}) poll.Action { return func(poll.Enqueuer) error { - if !env.Callback(toBeatEvent(doc, private)) { + if !env.Callback(env.toBeatEvent(doc, private)) { return errTerminated } return nil @@ -272,7 +272,7 @@ func (env apiEnvironment) ReportAPIError(err apiError) poll.Action { } } -func toBeatEvent(doc common.MapStr, private interface{}) beat.Event { +func (env apiEnvironment) toBeatEvent(doc common.MapStr, private interface{}) beat.Event { var errs multierror.Errors ts, err := getDateKey(doc, "CreationTime", apiDateFormats) if err != nil { @@ -286,6 +286,11 @@ func toBeatEvent(doc common.MapStr, private interface{}) beat.Event { }, Private: private, } + if env.Config.SetIDFromAuditRecord { + if id, err := getString(doc, "Id"); err == nil && len(id) > 0 { + b.SetID(id) + } + } if len(errs) > 0 { msgs := make([]string, len(errs)) for idx, e := range errs {