Skip to content

Commit

Permalink
[filebeat][gcs] - Added support for new features and removed partial …
Browse files Browse the repository at this point in the history
…save mechanism (#36713)

* removed partial save mechanism

* added support for timestamp filtering, expand event list & fileselectors. Updated docs

* resolved linter issues

* updated changelog

* fixed asciidoc errors

* addressed PR comments

* removed redundant nil check for fileselector regex

* addressed PR suggestions

* updated comments and resolved pr suggestions

* updated logs
  • Loading branch information
ShourieG authored Oct 3, 2023
1 parent 25624ab commit 8818aca
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 161 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690]
- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668]
- Add CEL partial value debug function. {pull}36652[36652]
- Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713]

*Auditbeat*

Expand Down
98 changes: 94 additions & 4 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ which will in turn use the `max_workers` value to initialize an in-memory worker
one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file).

NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout
calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.

*A Sample Response :-*
["source","json"]
Expand Down Expand Up @@ -167,6 +166,9 @@ Now let's explore the configuration attributes a bit more elaborately.
8. <<attrib-poll-gcs,poll>>
9. <<attrib-poll_interval-gcs,poll_interval>>
10. <<attrib-parse_json,parse_json>>
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -252,6 +254,94 @@ highly nested json data. If this is set to `false` the *gcs.storage.object.json_
applicable for json objects and has no effect on other types of objects. This attribute can be specified both at the root level of the configuration as well at the bucket level.
The bucket level values will always take priority and override the root level values if both are specified.

[id="attrib-file_selectors-gcs"]
[float]
==== `file_selectors`

If the GCS buckets have objects that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit the files that are downloaded. This is a list of selectors which are based on a regular expression pattern. The regular expression should match the object name or should be a part of the object name (ideally a prefix). The regular expression syntax used is [RE2](https://github.com/google/re2/wiki/Syntax). Files that don't match any configured expression won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
file_selectors:
- regex: '/Monitoring/'
- regex: 'docs/'
- regex: '/Security-Logs/'
----

[id="attrib-expand_event_list_from_field-gcs"]
[float]
==== `expand_event_list_from_field`

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, json]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"region": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"region": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

[source,yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
expand_event_list_from_field: Records
----

NOTE: The `parse_json` setting is incompatible with `expand_event_list_from_field`. If enabled it will be ignored. This attribute is only applicable for JSON file formats. You do not need to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file or object.


[id="attrib-timestamp_epoch-gcs"]
[float]
==== `timestamp_epoch`

This attribute can be used to filter out files and objects that have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `object.Updated` field obtained from the object metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
timestamp_epoch: 1630444800
----

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*
Expand All @@ -260,7 +350,7 @@ The bucket level values will always take priority and override the root level va

Here `bucket_1` is using root level attributes while `bucket_2` overrides the values :

["source","yaml",subs="attributes"]
[source, yml]
----
filebeat.inputs:
- type: gcs
Expand Down Expand Up @@ -288,7 +378,7 @@ of using the root values.

Here both `bucket_1` and `bucket_2` overrides the root values :

["source","yaml",subs="attributes"]
[source, yml]
----
filebeat.inputs:
- type: gcs
Expand Down
63 changes: 41 additions & 22 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,58 @@ package gcs

import (
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
)

// MaxWorkers - Defines the maximum number of go routines that will be spawned.
// Poll - Defines if polling should be performed on the input bucket source.
// PollInterval - Defines the maximum amount of time to wait before polling for the
// next batch of objects from the bucket.
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out.
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to
// false, since it can get expensive dealing with highly nested json data.
// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON can be configured at a global level,
// which applies to all buckets, as well as at the bucket level.
// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON, FileSelectors, TimeStampEpoch & ExpandEventListFromField
// can be configured at a global level, which applies to all buckets, as well as at the bucket level.
// Bucket level configurations will always override global level values.
type config struct {
ProjectId string `config:"project_id" validate:"required"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
// ProjectId - Defines the project id of the concerned gcs bucket in Google Cloud.
ProjectId string `config:"project_id" validate:"required"`
// Auth - Defines the authentication mechanism to be used for accessing the gcs bucket.
Auth authConfig `config:"auth" validate:"required"`
// MaxWorkers - Defines the maximum number of go routines that will be spawned.
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
// Poll - Defines if polling should be performed on the input bucket source.
Poll *bool `config:"poll,omitempty"`
// PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket.
PollInterval *time.Duration `config:"poll_interval,omitempty"`
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to
// false, since it can get expensive dealing with highly nested json data.
ParseJSON *bool `config:"parse_json,omitempty"`
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out.
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Buckets []bucket `config:"buckets" validate:"required"`
// Buckets - Defines a list of buckets that will be polled for objects.
Buckets []bucket `config:"buckets" validate:"required"`
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket.
FileSelectors []fileSelectorConfig `config:"file_selectors"`
// TimeStampEpoch - Defines the epoch time in seconds, which is used to filter out objects that are older than the specified timestamp.
TimeStampEpoch *int64 `config:"timestamp_epoch"`
// ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events.
ExpandEventListFromField string `config:"expand_event_list_from_field"`
// This field is only used for system test purposes, to override the HTTP endpoint.
AlternativeHost string `config:"alternative_host,omitempty"`
}

// bucket contains the config for each specific object storage bucket in the root account
type bucket struct {
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// fileSelectorConfig helps filter out gcs objects based on a regex pattern
type fileSelectorConfig struct {
Regex *match.Matcher `config:"regex" validate:"required"`
// TODO: Add support for reader config in future
}

type authConfig struct {
Expand Down
49 changes: 36 additions & 13 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package gcs

import (
"context"
"fmt"
"time"

"cloud.google.com/go/storage"
Expand All @@ -22,6 +23,12 @@ type gcsInput struct {
config config
}

// defines the valid range for Unix timestamps for 64 bit integers
var (
minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
)

const (
inputName = "gcs"
)
Expand All @@ -47,18 +54,23 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
if err := cfg.Unpack(&config); err != nil {
return nil, nil, err
}

var sources []cursor.Source
sources := make([]cursor.Source, 0, len(config.Buckets))
for _, b := range config.Buckets {
bucket := tryOverrideOrDefault(config, b)
if bucket.TimeStampEpoch != nil && !isValidUnixTimestamp(*bucket.TimeStampEpoch) {
return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *bucket.TimeStampEpoch)
}
sources = append(sources, &Source{
ProjectId: config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
ProjectId: config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
TimeStampEpoch: bucket.TimeStampEpoch,
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
})
}

Expand All @@ -76,42 +88,53 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket {
}
b.MaxWorkers = &maxWorkers
}

if b.Poll == nil {
var poll bool
if cfg.Poll != nil {
poll = *cfg.Poll
}
b.Poll = &poll
}

if b.PollInterval == nil {
interval := time.Second * 300
if cfg.PollInterval != nil {
interval = *cfg.PollInterval
}
b.PollInterval = &interval
}

if b.ParseJSON == nil {
parse := false
if cfg.ParseJSON != nil {
parse = *cfg.ParseJSON
}
b.ParseJSON = &parse
}

if b.BucketTimeOut == nil {
timeOut := time.Second * 50
if cfg.BucketTimeOut != nil {
timeOut = *cfg.BucketTimeOut
}
b.BucketTimeOut = &timeOut
}
if b.TimeStampEpoch == nil {
b.TimeStampEpoch = cfg.TimeStampEpoch
}
if b.ExpandEventListFromField == "" {
b.ExpandEventListFromField = cfg.ExpandEventListFromField
}
if len(b.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 {
b.FileSelectors = cfg.FileSelectors
}

return b
}

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// checks if the timestamp is within the valid range
return minTimestamp <= timestamp && timestamp <= maxTimestamp
}

func (input *gcsInput) Name() string {
return inputName
}
Expand Down
29 changes: 19 additions & 10 deletions x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"cloud.google.com/go/storage"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/sync/errgroup"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -47,16 +48,20 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher, client *storage.Client) error {
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group
for _, b := range in.config.Buckets {
bucket := tryOverrideOrDefault(in.config, b)
source = &Source{
ProjectId: in.config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
ProjectId: in.config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
TimeStampEpoch: bucket.TimeStampEpoch,
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
}

st := newState()
Expand All @@ -80,8 +85,12 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
)

scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log)

return scheduler.schedule(ctx)
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
// the real world concurrent execution of multiple containers. This fix allows it to do so.
g.Go(func() error {
return scheduler.schedule(ctx)
})
}
return nil
return g.Wait()
}
Loading

0 comments on commit 8818aca

Please sign in to comment.