diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d2c82dfbf64..9dd1ad14e21 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -225,6 +225,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] - Improve template evaluation logging for HTTPJSON input. {pull}36668[36668] - Add CEL partial value debug function. {pull}36652[36652] +- Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index 83ba2084c89..3f8061739c7 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -76,8 +76,7 @@ which will in turn use the `max_workers` value to initialize an in-memory worker one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and -process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout -calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value. +process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value. *A Sample Response :-* ["source","json"] @@ -167,6 +166,9 @@ Now let's explore the configuration attributes a bit more elaborately. 8. <> 9. <> 10. <> + 11. <> + 12. <> + 13. <> [id="attrib-project-id"] @@ -252,6 +254,94 @@ highly nested json data. If this is set to `false` the *gcs.storage.object.json_ applicable for json objects and has no effect on other types of objects. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. +[id="attrib-file_selectors-gcs"] +[float] +==== `file_selectors` + +If the GCS buckets have objects that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit the files that are downloaded. This is a list of selectors which are based on a regular expression pattern. The regular expression should match the object name or should be a part of the object name (ideally a prefix). The regular expression syntax used is [RE2](https://github.com/google/re2/wiki/Syntax). Files that don't match any configured expression won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +[source, yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 15s + bucket_timeout: 60s + file_selectors: + - regex: '/Monitoring/' + - regex: 'docs/' + - regex: '/Security-Logs/' +---- + +[id="attrib-expand_event_list_from_field-gcs"] +[float] +==== `expand_event_list_from_field` + +If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +[source, json] +---- +{ + "Records": [ + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:51:00Z", + "region": "us-east-1", + "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", + }, + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:52:00Z", + "region": "us-east-1", + "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", + } + ] +} +---- + +[source,yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 15s + bucket_timeout: 60s + expand_event_list_from_field: Records +---- + +NOTE: The `parse_json` setting is incompatible with `expand_event_list_from_field`. If enabled it will be ignored. This attribute is only applicable for JSON file formats. You do not need to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file or object. + + +[id="attrib-timestamp_epoch-gcs"] +[float] +==== `timestamp_epoch` + +This attribute can be used to filter out files and objects that have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `object.Updated` field obtained from the object metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +[source, yml] +---- +filebeat.inputs: +- type: gcs + project_id: my_project_id + auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json + buckets: + - name: obs-bucket + max_workers: 3 + poll: true + poll_interval: 15s + bucket_timeout: 60s + timestamp_epoch: 1630444800 +---- [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* @@ -260,7 +350,7 @@ The bucket level values will always take priority and override the root level va Here `bucket_1` is using root level attributes while `bucket_2` overrides the values : -["source","yaml",subs="attributes"] +[source, yml] ---- filebeat.inputs: - type: gcs @@ -288,7 +378,7 @@ of using the root values. Here both `bucket_1` and `bucket_2` overrides the root values : -["source","yaml",subs="attributes"] +[source, yml] ---- filebeat.inputs: - type: gcs diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index a22ee5fde8b..846fe1fc94e 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -6,39 +6,58 @@ package gcs import ( "time" + + "github.com/elastic/beats/v7/libbeat/common/match" ) -// MaxWorkers - Defines the maximum number of go routines that will be spawned. -// Poll - Defines if polling should be performed on the input bucket source. -// PollInterval - Defines the maximum amount of time to wait before polling for the -// next batch of objects from the bucket. -// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. -// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to -// false, since it can get expensive dealing with highly nested json data. -// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON can be configured at a global level, -// which applies to all buckets, as well as at the bucket level. +// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON, FileSelectors, TimeStampEpoch & ExpandEventListFromField +// can be configured at a global level, which applies to all buckets, as well as at the bucket level. // Bucket level configurations will always override global level values. type config struct { - ProjectId string `config:"project_id" validate:"required"` - Auth authConfig `config:"auth" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - ParseJSON *bool `config:"parse_json,omitempty"` + // ProjectId - Defines the project id of the concerned gcs bucket in Google Cloud. + ProjectId string `config:"project_id" validate:"required"` + // Auth - Defines the authentication mechanism to be used for accessing the gcs bucket. + Auth authConfig `config:"auth" validate:"required"` + // MaxWorkers - Defines the maximum number of go routines that will be spawned. + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + // Poll - Defines if polling should be performed on the input bucket source. + Poll *bool `config:"poll,omitempty"` + // PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket. + PollInterval *time.Duration `config:"poll_interval,omitempty"` + // ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to + // false, since it can get expensive dealing with highly nested json data. + ParseJSON *bool `config:"parse_json,omitempty"` + // BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` - Buckets []bucket `config:"buckets" validate:"required"` + // Buckets - Defines a list of buckets that will be polled for objects. + Buckets []bucket `config:"buckets" validate:"required"` + // FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. + FileSelectors []fileSelectorConfig `config:"file_selectors"` + // TimeStampEpoch - Defines the epoch time in seconds, which is used to filter out objects that are older than the specified timestamp. + TimeStampEpoch *int64 `config:"timestamp_epoch"` + // ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events. + ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. AlternativeHost string `config:"alternative_host,omitempty"` } // bucket contains the config for each specific object storage bucket in the root account type bucket struct { - Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - ParseJSON *bool `config:"parse_json,omitempty"` + Name string `config:"name" validate:"required"` + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` + Poll *bool `config:"poll,omitempty"` + PollInterval *time.Duration `config:"poll_interval,omitempty"` + ParseJSON *bool `config:"parse_json,omitempty"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + TimeStampEpoch *int64 `config:"timestamp_epoch"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` +} + +// fileSelectorConfig helps filter out gcs objects based on a regex pattern +type fileSelectorConfig struct { + Regex *match.Matcher `config:"regex" validate:"required"` + // TODO: Add support for reader config in future } type authConfig struct { diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 81ed1b82210..ee9684d0213 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -6,6 +6,7 @@ package gcs import ( "context" + "fmt" "time" "cloud.google.com/go/storage" @@ -22,6 +23,12 @@ type gcsInput struct { config config } +// defines the valid range for Unix timestamps for 64 bit integers +var ( + minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() + maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() +) + const ( inputName = "gcs" ) @@ -47,18 +54,23 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { if err := cfg.Unpack(&config); err != nil { return nil, nil, err } - - var sources []cursor.Source + sources := make([]cursor.Source, 0, len(config.Buckets)) for _, b := range config.Buckets { bucket := tryOverrideOrDefault(config, b) + if bucket.TimeStampEpoch != nil && !isValidUnixTimestamp(*bucket.TimeStampEpoch) { + return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *bucket.TimeStampEpoch) + } sources = append(sources, &Source{ - ProjectId: config.ProjectId, - BucketName: bucket.Name, - BucketTimeOut: *bucket.BucketTimeOut, - MaxWorkers: *bucket.MaxWorkers, - Poll: *bucket.Poll, - PollInterval: *bucket.PollInterval, - ParseJSON: *bucket.ParseJSON, + ProjectId: config.ProjectId, + BucketName: bucket.Name, + BucketTimeOut: *bucket.BucketTimeOut, + MaxWorkers: *bucket.MaxWorkers, + Poll: *bucket.Poll, + PollInterval: *bucket.PollInterval, + ParseJSON: *bucket.ParseJSON, + TimeStampEpoch: bucket.TimeStampEpoch, + ExpandEventListFromField: bucket.ExpandEventListFromField, + FileSelectors: bucket.FileSelectors, }) } @@ -76,7 +88,6 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { } b.MaxWorkers = &maxWorkers } - if b.Poll == nil { var poll bool if cfg.Poll != nil { @@ -84,7 +95,6 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { } b.Poll = &poll } - if b.PollInterval == nil { interval := time.Second * 300 if cfg.PollInterval != nil { @@ -92,7 +102,6 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { } b.PollInterval = &interval } - if b.ParseJSON == nil { parse := false if cfg.ParseJSON != nil { @@ -100,7 +109,6 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { } b.ParseJSON = &parse } - if b.BucketTimeOut == nil { timeOut := time.Second * 50 if cfg.BucketTimeOut != nil { @@ -108,10 +116,25 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket { } b.BucketTimeOut = &timeOut } + if b.TimeStampEpoch == nil { + b.TimeStampEpoch = cfg.TimeStampEpoch + } + if b.ExpandEventListFromField == "" { + b.ExpandEventListFromField = cfg.ExpandEventListFromField + } + if len(b.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 { + b.FileSelectors = cfg.FileSelectors + } return b } +// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp +func isValidUnixTimestamp(timestamp int64) bool { + // checks if the timestamp is within the valid range + return minTimestamp <= timestamp && timestamp <= maxTimestamp +} + func (input *gcsInput) Name() string { return inputName } diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index d31f0875262..04ec19de5dd 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -10,6 +10,7 @@ import ( "cloud.google.com/go/storage" gax "github.com/googleapis/gax-go/v2" + "golang.org/x/sync/errgroup" v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" @@ -47,16 +48,20 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher, client *storage.Client) error { pub := statelessPublisher{wrapped: publisher} var source cursor.Source + var g errgroup.Group for _, b := range in.config.Buckets { bucket := tryOverrideOrDefault(in.config, b) source = &Source{ - ProjectId: in.config.ProjectId, - BucketName: bucket.Name, - BucketTimeOut: *bucket.BucketTimeOut, - MaxWorkers: *bucket.MaxWorkers, - Poll: *bucket.Poll, - PollInterval: *bucket.PollInterval, - ParseJSON: *bucket.ParseJSON, + ProjectId: in.config.ProjectId, + BucketName: bucket.Name, + BucketTimeOut: *bucket.BucketTimeOut, + MaxWorkers: *bucket.MaxWorkers, + Poll: *bucket.Poll, + PollInterval: *bucket.PollInterval, + ParseJSON: *bucket.ParseJSON, + TimeStampEpoch: bucket.TimeStampEpoch, + ExpandEventListFromField: bucket.ExpandEventListFromField, + FileSelectors: bucket.FileSelectors, } st := newState() @@ -80,8 +85,12 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher ) scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log) - - return scheduler.schedule(ctx) + // allows multiple containers to be scheduled concurrently while testing + // the stateless input is triggered only while testing and till now it did not mimic + // the real world concurrent execution of multiple containers. This fix allows it to do so. + g.Go(func() error { + return scheduler.schedule(ctx) + }) } - return nil + return g.Wait() } diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index bd9028d6bf9..41e64c031f9 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -360,6 +360,164 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_json_array[3]: true, }, }, + { + name: "FilterByTimeStampEpoch", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "timestamp_epoch": 1661385600, + "buckets": []map[string]interface{}{ + { + "name": bucketGcsTestNew, + }, + }, + }, + mockHandler: mock.GCSServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "FilterByFileSelectorRegexSingle", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + "buckets": []map[string]interface{}{ + { + "name": bucketGcsTestNew, + }, + }, + }, + mockHandler: mock.GCSServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, + { + name: "FilterByFileSelectorRegexMulti", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + { + "regex": "data", + }, + }, + "buckets": []map[string]interface{}{ + { + "name": bucketGcsTestNew, + }, + }, + }, + mockHandler: mock.GCSServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_docs_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + }, + }, + { + name: "ExpandEventListFromField", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "expand_event_list_from_field": "Events", + "file_selectors": []map[string]interface{}{ + { + "regex": "events-array", + }, + }, + "buckets": []map[string]interface{}{ + { + "name": beatsJSONBucket, + }, + }, + }, + mockHandler: mock.GCSFileServer, + expected: map[string]bool{ + mock.BeatsFilesBucket_events_array_json[0]: true, + mock.BeatsFilesBucket_events_array_json[1]: true, + }, + }, + { + name: "MultiContainerWithMultiFileSelectors", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "buckets": []map[string]interface{}{ + { + "name": bucketGcsTestNew, + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + }, + { + "name": bucketGcsTestLatest, + "file_selectors": []map[string]interface{}{ + { + "regex": "data_3", + }, + }, + }, + }, + }, + mockHandler: mock.GCSServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_docs_ata_json: true, + mock.Gcs_test_latest_object_data3_json: true, + }, + }, + { + name: "FilterByFileSelectorEmptyRegex", + baseConfig: map[string]interface{}{ + "project_id": "elastic-sa", + "auth.credentials_file.path": "/gcs_creds.json", + "max_workers": 1, + "poll": true, + "poll_interval": "5s", + "file_selectors": []map[string]interface{}{ + { + "regex": "", + }, + }, + "buckets": []map[string]interface{}{ + { + "name": bucketGcsTestNew, + }, + }, + }, + mockHandler: mock.GCSServer, + expected: map[string]bool{ + mock.Gcs_test_new_object_ata_json: true, + mock.Gcs_test_new_object_data3_json: true, + mock.Gcs_test_new_object_docs_ata_json: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index edcb7fe976a..63e631e39be 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -35,9 +35,6 @@ type job struct { objectURI string // object hash, used in setting event id hash string - // offset value for an object, it points to the location inside the data stream - // from where we can start processing the object. - offset int64 // flag to denote if object is gZipped compressed or not. isCompressed bool // flag to denote if object's root element is of an array type @@ -86,13 +83,10 @@ func (j *job) do(ctx context.Context, id string) { if j.object.ContentType == gzType || j.object.ContentEncoding == encodingGzip { j.isCompressed = true } - if result, ok := j.state.cp.IsRootArray[j.object.Name]; ok { - j.isRootArray = result - } err := j.processAndPublishData(ctx, id) if err != nil { j.state.updateFailedJobs(j.object.Name) - j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) + j.log.Errorw("job encountered an error while publishing data and has been added to a failed jobs list", "gcs.jobId", id, "error", err) return } @@ -109,7 +103,7 @@ func (j *job) do(ctx context.Context, id string) { // locks while data is being saved and published to avoid concurrent map read/writes cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) if err := j.publisher.Publish(event, cp); err != nil { - j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) } // unlocks after data is saved and published done() @@ -129,17 +123,10 @@ func (j *job) Timestamp() time.Time { } func (j *job) processAndPublishData(ctx context.Context, id string) error { - var err error - var offset int64 ctxWithTimeout, cancel := context.WithTimeout(ctx, j.src.BucketTimeOut) defer cancel() obj := j.bucket.Object(j.object.Name) - // if object is compressed or object root element is an array, then we cannot use an - // offset to read as it will produce an erroneous data stream. - if !j.isCompressed && !j.isRootArray { - offset = j.offset - } - reader, err := obj.NewRangeReader(ctxWithTimeout, offset, -1) + reader, err := obj.NewReader(ctxWithTimeout) if err != nil { return fmt.Errorf("failed to open reader for object: %s, with error: %w", j.object.Name, err) } @@ -159,23 +146,19 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { - var err error - r, err = j.addGzipDecoderIfNeeded(bufio.NewReader(r)) + r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r)) if err != nil { return fmt.Errorf("failed to add gzip decoder to object: %s, with error: %w", j.object.Name, err) } - // if offset == 0, then this is a new stream which has not been processed previously - if j.offset == 0 { - r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) - if err != nil { - return fmt.Errorf("failed to evaluate json for object: %s, with error: %w", j.object.Name, err) - } - if j.isRootArray { - j.state.setRootArray(j.object.Name) - } + r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) + if err != nil { + return fmt.Errorf("failed to evaluate json for object: %s, with error: %w", j.object.Name, err) } + dec := json.NewDecoder(r) + // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. + dec.UseNumber() // If array is present at root then read json token and advance decoder if j.isRootArray { _, err := dec.Token() @@ -184,24 +167,21 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er } } - var offset, relativeOffset int64 - // uncompressed files use the client to directly set the offset, this - // in turn causes the offset to reset to 0 for the new stream, hence why - // we need to keep relative offsets to keep track of the actual offset - if !j.isCompressed && !j.isRootArray { - relativeOffset = j.offset - } - dec.UseNumber() for dec.More() && ctx.Err() == nil { var item json.RawMessage - offset = dec.InputOffset() + offset := dec.InputOffset() if err = dec.Decode(&item); err != nil { return fmt.Errorf("failed to decode json: %w", err) } - // manually seek offset only if file is compressed or if root element is an array - if (j.isCompressed || j.isRootArray) && offset < j.offset { + + // if expand_event_list_from_field is set, then split the event list + if j.src.ExpandEventListFromField != "" { + if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil { + return err + } continue } + var parsedData []mapstr.M if j.src.ParseJSON { parsedData, err = decodeJSON(bytes.NewReader(item)) @@ -209,28 +189,78 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } } - evt := j.createEvent(item, parsedData, offset+relativeOffset) - // updates the offset after reading the file - // this avoids duplicates for the last read when resuming operation - offset = dec.InputOffset() - // locks while data is being saved and published to avoid concurrent map read/writes - var ( - done func() - cp *Checkpoint - ) + evt := j.createEvent(item, parsedData, offset) if !dec.More() { - // if this is the last object, then peform a complete state save - cp, done = j.state.saveForTx(j.object.Name, j.object.Updated) + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } + done() } else { - // partially saves read state using offset - cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset) + // since we don't update the cursor checkpoint, lack of a lock here is not a problem + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } + } + } + return nil +} + +// splitEventList splits the event list into individual events and publishes them +func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { + var jsonObject map[string]json.RawMessage + if err := json.Unmarshal(raw, &jsonObject); err != nil { + return fmt.Errorf("job with job id %s encountered an unmarshaling error: %w", id, err) + } + + raw, found := jsonObject[key] + if !found { + return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. + dec.UseNumber() + + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", j.object.Name, err) + } + delim, ok := tok.(json.Delim) + if !ok || delim != '[' { + return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + } + + for dec.More() { + arrayOffset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) } - if err := j.publisher.Publish(evt, cp); err != nil { - j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) + + data, err := item.MarshalJSON() + if err != nil { + return fmt.Errorf("job with job id %s encountered a marshaling error: %w", id, err) + } + evt := j.createEvent(data, nil, offset+arrayOffset) + + if !dec.More() { + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } + done() + } else { + // since we don't update the cursor checkpoint, lack of a lock here is not a problem + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorw("job encountered an error while publishing event", "gcs.jobId", id, "error", err) + } } - // unlocks after data is saved and published - done() } + return nil } diff --git a/x-pack/filebeat/input/gcs/mock/data.go b/x-pack/filebeat/input/gcs/mock/data.go index 1bbe2c92e72..6bd9d6dcfbf 100644 --- a/x-pack/filebeat/input/gcs/mock/data.go +++ b/x-pack/filebeat/input/gcs/mock/data.go @@ -130,8 +130,8 @@ var objectList = map[string]string{ "crc32c": "hHW/Qw==", "etag": "CM7Ww6q73/kCEAE=", "timeCreated": "2022-08-24T12:20:36.713Z", - "updated": "2022-08-24T12:20:36.713Z", - "timeStorageClassUpdated": "2022-08-24T12:20:36.713Z" + "updated": "2022-08-25T12:20:36.713Z", + "timeStorageClassUpdated": "2022-08-25T12:20:36.713Z" }, { "kind": "storage#object", @@ -192,8 +192,8 @@ var objectList = map[string]string{ "crc32c": "hHW/Qw==", "etag": "CM7Ww6q73/kCEAE=", "timeCreated": "2022-08-24T12:20:36.713Z", - "updated": "2022-08-24T12:20:36.713Z", - "timeStorageClassUpdated": "2022-08-24T12:20:36.713Z" + "updated": "2022-08-25T12:20:36.713Z", + "timeStorageClassUpdated": "2022-08-25T12:20:36.713Z" } ] }`, diff --git a/x-pack/filebeat/input/gcs/mock/data_files.go b/x-pack/filebeat/input/gcs/mock/data_files.go index c6e546d56da..b19175b477e 100644 --- a/x-pack/filebeat/input/gcs/mock/data_files.go +++ b/x-pack/filebeat/input/gcs/mock/data_files.go @@ -358,3 +358,7 @@ var BeatsFilesBucket_multiline_json_gz = []string{ "{\n \"@timestamp\": \"2021-05-25T17:25:42.806Z\",\n \"log.level\": \"error\",\n \"message\": \"error making http request\"\n}", "{\n \"@timestamp\": \"2021-05-25T17:25:51.391Z\",\n \"log.level\": \"info\",\n \"message\": \"available disk space 44.3gb\"\n}", } +var BeatsFilesBucket_events_array_json = []string{ + "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", + "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", +} diff --git a/x-pack/filebeat/input/gcs/mock/testdata/array-at-root.json b/x-pack/filebeat/input/gcs/mock/testdata/array-at-root.json index 68cd2b41e7b..8d22df6aeb8 100644 --- a/x-pack/filebeat/input/gcs/mock/testdata/array-at-root.json +++ b/x-pack/filebeat/input/gcs/mock/testdata/array-at-root.json @@ -7,4 +7,4 @@ "time": "2021-05-26 22:21:40 UTC", "msg": "world" } -] \ No newline at end of file +] diff --git a/x-pack/filebeat/input/gcs/mock/testdata/log.json b/x-pack/filebeat/input/gcs/mock/testdata/log.json index f6aaf5ec64d..b88e0545284 100644 --- a/x-pack/filebeat/input/gcs/mock/testdata/log.json +++ b/x-pack/filebeat/input/gcs/mock/testdata/log.json @@ -1 +1 @@ -{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} \ No newline at end of file +{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} diff --git a/x-pack/filebeat/input/gcs/mock/testdata/multiline.json b/x-pack/filebeat/input/gcs/mock/testdata/multiline.json index 974d296762a..32d319af2bc 100644 --- a/x-pack/filebeat/input/gcs/mock/testdata/multiline.json +++ b/x-pack/filebeat/input/gcs/mock/testdata/multiline.json @@ -7,4 +7,4 @@ "@timestamp": "2021-05-25T17:25:51.391Z", "log.level": "info", "message": "available space 44.3gb" -} \ No newline at end of file +} diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index d88112979b2..c68c3123cf2 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -141,9 +141,17 @@ func fetchJobID(workerId int, bucketName string, objectName string) string { } func (s *scheduler) createJobs(objects []*storage.ObjectAttrs, log *logp.Logger) []*job { + //nolint:prealloc // No need to preallocate the slice var jobs []*job - for _, obj := range objects { + // if file selectors are present, then only select the files that match the regex + if len(s.src.FileSelectors) != 0 && !s.isFileSelected(obj.Name) { + continue + } + // date filter is applied on last updated time of the object + if s.src.TimeStampEpoch != nil && obj.Updated.Unix() < *s.src.TimeStampEpoch { + continue + } // check required to ignore directories & sub folders, since there is no inbuilt option to // do so. In gcs all the directories are emulated and represented by a prefix, we can // define specific prefix's & delimiters to ignore known directories but there is no generic @@ -180,10 +188,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { ignore := false for _, job := range jobs { - switch offset, isPartial := s.state.cp.LastProcessedOffset[job.object.Name]; { - case isPartial: - job.offset = offset - latestJobs = append(latestJobs, job) + switch { case job.Timestamp().After(s.state.checkpoint().LatestEntryTime): latestJobs = append(latestJobs, job) case job.Name() == s.state.checkpoint().ObjectName: @@ -205,7 +210,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { // in a senario where there are some jobs which have a later time stamp // but lesser lexicographic order and some jobs have greater lexicographic order - // than the current checkpoint or if partially completed jobs are present + // than the current checkpoint object name, then we append the latest jobs if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { jobsToReturn = append(latestJobs, jobsToReturn...) } @@ -239,3 +244,12 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { } return jobs } + +func (s *scheduler) isFileSelected(name string) bool { + for _, sel := range s.src.FileSelectors { + if sel.Regex.MatchString(name) { + return true + } + } + return false +} diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index afa20e5d52d..59f79fce471 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -30,19 +30,12 @@ type Checkpoint struct { LatestEntryTime time.Time // list of failed jobs due to unexpected errors/download errors FailedJobs map[string]int - // a mapping from object name to whether the object is having an array type as it's root. - IsRootArray map[string]bool - // a mapping from object name to an array index that contains the last processed offset for that object. - // if isRootArray == true for object, then LastProcessedOffset will treat offset as an array index - LastProcessedOffset map[string]int64 } func newState() *state { return &state{ cp: &Checkpoint{ - FailedJobs: make(map[string]int), - LastProcessedOffset: make(map[string]int64), - IsRootArray: make(map[string]bool), + FailedJobs: make(map[string]int), }, } } @@ -53,8 +46,6 @@ func newState() *state { // more than once. func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() - delete(s.cp.LastProcessedOffset, name) - delete(s.cp.IsRootArray, name) if _, ok := s.cp.FailedJobs[name]; !ok { if len(s.cp.ObjectName) == 0 { s.cp.ObjectName = name @@ -74,23 +65,6 @@ func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint return s.cp, func() { s.mu.Unlock() } } -// savePartialForTx partially updates and returns the current state checkpoint, locks the state -// and returns an unlock function, done. The caller must call done when -// s and cp are no longer needed in a locked state. done may not be called -// more than once. -func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { - s.mu.Lock() - s.cp.LastProcessedOffset[name] = offset - return s.cp, func() { s.mu.Unlock() } -} - -// setRootArray, sets boolean true for objects that have their roots defined as an array type -func (s *state) setRootArray(name string) { - s.mu.Lock() - s.cp.IsRootArray[name] = true - s.mu.Unlock() -} - // updateFailedJobs, adds a job name to a failedJobs map, which helps // in keeping track of failed jobs during edge cases when the state might // move ahead in timestamp & objectName due to successful operations from other workers. @@ -98,10 +72,6 @@ func (s *state) setRootArray(name string) { // entry is removed from the map func (s *state) updateFailedJobs(jobName string) { s.mu.Lock() - // we do not store partially processed jobs as failed jobs - if _, ok := s.cp.LastProcessedOffset[jobName]; ok { - return - } s.cp.FailedJobs[jobName]++ if s.cp.FailedJobs[jobName] > maxFailedJobRetries { delete(s.cp.FailedJobs, jobName) @@ -116,12 +86,6 @@ func (s *state) setCheckpoint(chkpt *Checkpoint) { if chkpt.FailedJobs == nil { chkpt.FailedJobs = make(map[string]int) } - if chkpt.IsRootArray == nil { - chkpt.IsRootArray = make(map[string]bool) - } - if chkpt.LastProcessedOffset == nil { - chkpt.LastProcessedOffset = make(map[string]int64) - } s.cp = chkpt } diff --git a/x-pack/filebeat/input/gcs/types.go b/x-pack/filebeat/input/gcs/types.go index ced80531246..5934d119979 100644 --- a/x-pack/filebeat/input/gcs/types.go +++ b/x-pack/filebeat/input/gcs/types.go @@ -11,13 +11,16 @@ import ( // Source, it is the cursor source type Source struct { - BucketName string - BucketTimeOut time.Duration - ProjectId string - MaxWorkers int - Poll bool - PollInterval time.Duration - ParseJSON bool + BucketName string + BucketTimeOut time.Duration + ProjectId string + MaxWorkers int + Poll bool + PollInterval time.Duration + ParseJSON bool + TimeStampEpoch *int64 + FileSelectors []fileSelectorConfig + ExpandEventListFromField string } func (s *Source) Name() string {