From c6c0a1f63ecad38941454b87710e931a1216bc3a Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 29 Jun 2020 14:04:48 +0200 Subject: [PATCH 1/6] Add date_cursor to httpjson input --- x-pack/filebeat/input/httpjson/config.go | 42 ++++++++++++ x-pack/filebeat/input/httpjson/config_test.go | 11 +++ .../filebeat/input/httpjson/httpjson_test.go | 48 +++++++++++++ x-pack/filebeat/input/httpjson/input.go | 68 +++++++++++++++++-- 4 files changed, 165 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 2fcc2fc8941..2d0731b768b 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -7,6 +7,7 @@ package httpjson import ( "regexp" "strings" + "text/template" "time" "github.com/pkg/errors" @@ -35,6 +36,7 @@ type config struct { RetryWaitMax time.Duration `config:"retry.wait_max"` TLS *tlscommon.Config `config:"ssl"` URL string `config:"url" validate:"required"` + DateCursor *DateCursor `config:"date_cursor"` } // Pagination contains information about httpjson pagination settings @@ -65,6 +67,43 @@ type RateLimit struct { Remaining string `config:"remaining"` } +type DateCursor struct { + Enabled *bool `config:"enabled"` + Field string `config:"field" validate:"required"` + URLField string `config:"url_field" validate:"required"` + ValueTemplate *Template `config:"value_template"` + DateFormat string `config:"date_format"` + InitialInterval time.Duration `config:"initial_interval"` +} + +type Template struct { + *template.Template +} + +func (t *Template) Unpack(in string) error { + tpl, err := template.New("tpl").Parse(in) + if err != nil { + return err + } + + *t = Template{Template: tpl} + + return nil +} + +// IsEnabled returns true if the `enable` field is set to true in the yaml. +func (dc *DateCursor) IsEnabled() bool { + return dc != nil && (dc.Enabled == nil || *dc.Enabled) +} + +// IsEnabled returns true if the `enable` field is set to true in the yaml. +func (dc *DateCursor) GetDateFormat() string { + if dc.DateFormat == "" { + return time.RFC3339 + } + return dc.DateFormat +} + func (c *config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET", "POST": @@ -81,6 +120,9 @@ func (c *config) Validate() error { } } if c.Pagination != nil { + if c.DateCursor.IsEnabled() { + return errors.Errorf("invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms") + } if c.Pagination.Header != nil { if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 { return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously") diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index cfec6a2440b..62783049631 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -350,6 +350,17 @@ func TestConfigOauth2Validation(t *testing.T) { "url": "localhost", }, }, + { + name: "date_cursor must fail in combination with pagination", + expectedErr: "invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms accessing config", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo"}, + "pagination": map[string]interface{}{ + "header": map[string]interface{}{"field_name": "foo", "regex_pattern": "bar"}, + }, + "url": "localhost", + }, + }, } for _, c := range cases { diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 4e70fe72472..95b8c778b47 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -7,6 +7,7 @@ package httpjson import ( "context" "encoding/json" + "fmt" "io/ioutil" "log" "math/rand" @@ -727,3 +728,50 @@ func TestArrayWithSplitResponse(t *testing.T) { } }) } + +func TestCursor(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "date_cursor.field": "@timestamp", + "date_cursor.url_field": "$filter", + "date_cursor.value_template": "alertCreationTime ge {{.}}", + "date_cursor.initial_interval": "10m", + "date_cursor.date_format": "2006-01-02T15:04:05Z", + } + + timeNow = func() time.Time { + t, _ := time.Parse("2006-01-02T15:04:05Z", "2002-10-02T15:10:00Z") + return t + } + + const ( + expectedQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z" + expectedNextCursorValue = "2002-10-02T15:00:01Z" + expectedNextQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A01Z" + ) + var gotQuery string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + gotQuery = r.URL.Query().Encode() + w.Write([]byte(`[{"@timestamp":"2002-10-02T15:00:00Z"},{"@timestamp":"2002-10-02T15:00:01Z"}]`)) + })) + + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(2) + if !ok { + t.Fatalf("Expected 2 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + + assert.Equal(t, expectedQuery, gotQuery) + assert.Equal(t, expectedNextCursorValue, input.cursor.nextCursorValue) + assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL()) + }) +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 78f5eba5344..72d7b6f0d46 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "regexp" "strconv" "sync" @@ -37,6 +38,9 @@ const ( var userAgent = useragent.UserAgent("Filebeat") +// for testing +var timeNow = time.Now + func init() { err := input.Register(inputName, NewInput) if err != nil { @@ -55,6 +59,8 @@ type HttpjsonInput struct { workerCancel context.CancelFunc // Used to signal that the worker should stop. workerOnce sync.Once // Guarantees that the worker goroutine is only started once. workerWg sync.WaitGroup // Waits on worker goroutine. + + nextCursorValue string } // RequestInfo struct has the information for generating an HTTP request @@ -343,6 +349,7 @@ func createRequestInfoFromBody(m common.MapStr, idField string, requestField str // processHTTPRequest processes HTTP request, and handles pagination if enabled func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error { + ri.URL = in.getURL() for { req, err := in.createHTTPRequest(ctx, ri) if err != nil { @@ -407,8 +414,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl in.log.Debug("http.response.body is not a valid JSON object", string(responseData)) return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj) } - - if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() { + if mm != nil && in.config.Pagination.IsEnabled() { if in.config.Pagination.Header != nil { // Pagination control using HTTP Header url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern) @@ -427,7 +433,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl continue } else { // Pagination control using HTTP Body fields - ri, err := createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri) + ri, err = createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri) if err != nil { return err } @@ -441,10 +447,65 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl continue } } + if mm != nil && in.config.DateCursor.IsEnabled() { + in.advanceCursor(common.MapStr(mm)) + } return nil } } +func (in *HttpjsonInput) getURL() string { + if !in.config.DateCursor.IsEnabled() { + return in.config.URL + } + + var dateStr string + if in.nextCursorValue == "" { + t := timeNow().Add(-in.config.DateCursor.InitialInterval) + dateStr = t.Format(in.config.DateCursor.GetDateFormat()) + } else { + dateStr = in.nextCursorValue + } + + url, err := url.Parse(in.config.URL) + if err != nil { + return in.config.URL + } + + q := url.Query() + + var value string + if in.config.DateCursor.ValueTemplate == nil { + value = dateStr + } else { + buf := new(bytes.Buffer) + if err := in.config.DateCursor.ValueTemplate.Execute(buf, dateStr); err != nil { + return in.config.URL + } + value = buf.String() + } + + q.Set(in.config.DateCursor.URLField, value) + + url.RawQuery = q.Encode() + + return url.String() +} + +func (in *HttpjsonInput) advanceCursor(m common.MapStr) { + switch v, _ := m.GetValue(in.config.DateCursor.Field); t := v.(type) { + case string: + _, err := time.Parse(in.config.DateCursor.GetDateFormat(), t) + if err != nil { + return + } + in.nextCursorValue = t + default: + in.log.Warn("date_cursor field must be a string, cursor will not advance") + return + } +} + func (in *HttpjsonInput) run() error { ctx, cancel := context.WithCancel(in.workerCtx) defer cancel() @@ -455,7 +516,6 @@ func (in *HttpjsonInput) run() error { } ri := &RequestInfo{ - URL: in.URL, ContentMap: common.MapStr{}, Headers: in.HTTPHeaders, } From 655f100c0d8c914e322a7a1503aaba7c111b583e Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 29 Jun 2020 14:13:10 +0200 Subject: [PATCH 2/6] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6bcd7474392..b8f05a254c1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953] - Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892] - Adds `split_events_by` option to httpjson input. {pull}19246[19246] +- Adds `date_cursor` option to httpjson input. {pull}19483[19483] *Heartbeat* From 65f7d58b6fa284cfb9821efc6d62fc8c70fa8515 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 29 Jun 2020 17:27:54 +0200 Subject: [PATCH 3/6] Fix tests --- x-pack/filebeat/input/httpjson/httpjson_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 95b8c778b47..75374404eea 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -771,7 +771,7 @@ func TestCursor(t *testing.T) { } assert.Equal(t, expectedQuery, gotQuery) - assert.Equal(t, expectedNextCursorValue, input.cursor.nextCursorValue) + assert.Equal(t, expectedNextCursorValue, input.nextCursorValue) assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL()) }) } From f435cca482a851d166a0348258104ad0ff0d3af9 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 29 Jun 2020 17:29:28 +0200 Subject: [PATCH 4/6] Default to UTC date --- x-pack/filebeat/input/httpjson/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 72d7b6f0d46..0f36457fccb 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -461,7 +461,7 @@ func (in *HttpjsonInput) getURL() string { var dateStr string if in.nextCursorValue == "" { - t := timeNow().Add(-in.config.DateCursor.InitialInterval) + t := timeNow().UTC().Add(-in.config.DateCursor.InitialInterval) dateStr = t.Format(in.config.DateCursor.GetDateFormat()) } else { dateStr = in.nextCursorValue From ff81afc61efa71f45af06b28f4a2eb87deac638c Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 30 Jun 2020 12:57:36 +0200 Subject: [PATCH 5/6] Add date_cursor validations and better error message --- x-pack/filebeat/input/httpjson/config.go | 11 +++++++++++ x-pack/filebeat/input/httpjson/config_test.go | 16 ++++++++++++++++ x-pack/filebeat/input/httpjson/input.go | 7 ++++++- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 2d0731b768b..63d20221de4 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -104,6 +104,17 @@ func (dc *DateCursor) GetDateFormat() string { return dc.DateFormat } +func (dc *DateCursor) Validate() error { + if dc.DateFormat == "" { + return nil + } + now := time.Now().Format(dc.DateFormat) + if _, err := time.Parse(dc.DateFormat, now); err != nil { + return errors.New("invalid configuration: date_format is not a valid date layout") + } + return nil +} + func (c *config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET", "POST": diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index 62783049631..a86c2aa76db 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -8,6 +8,7 @@ import ( "context" "os" "testing" + "time" "github.com/pkg/errors" "golang.org/x/oauth2/google" @@ -361,6 +362,21 @@ func TestConfigOauth2Validation(t *testing.T) { "url": "localhost", }, }, + { + name: "date_cursor.date_format will fail if invalid", + expectedErr: "invalid configuration: date_format is not a valid date layout accessing 'date_cursor'", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": "1234"}, + "url": "localhost", + }, + }, + { + name: "date_cursor must work with a valid date_format", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": time.RFC3339}, + "url": "localhost", + }, + }, } for _, c := range cases { diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 0f36457fccb..fe65f63ebf8 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -493,7 +493,12 @@ func (in *HttpjsonInput) getURL() string { } func (in *HttpjsonInput) advanceCursor(m common.MapStr) { - switch v, _ := m.GetValue(in.config.DateCursor.Field); t := v.(type) { + v, err := m.GetValue(in.config.DateCursor.Field) + if err != nil { + in.log.Warnf("date_cursor field: %q", err) + return + } + switch t := v.(type) { case string: _, err := time.Parse(in.config.DateCursor.GetDateFormat(), t) if err != nil { From d04d3fefd0aa70528fcfd23fda9fdedbdb37b7e6 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 1 Jul 2020 11:30:09 +0200 Subject: [PATCH 6/6] Run fmt update --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f4867e4d91e..901d13a1304 100644 --- a/go.mod +++ b/go.mod @@ -163,7 +163,7 @@ require ( golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200630154851-b2d8b0336632 + golang.org/x/tools v0.0.0-20200701041122-1837592efa10 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1