Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][httpjson] Add date_cursor to httpjson input #19483

Merged
merged 6 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]
- Adds `date_cursor` option to httpjson input. {pull}19483[19483]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ require (
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200630154851-b2d8b0336632
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
53 changes: 53 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"regexp"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -35,6 +36,7 @@ type config struct {
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
}

// Pagination contains information about httpjson pagination settings
Expand Down Expand Up @@ -65,6 +67,54 @@ type RateLimit struct {
Remaining string `config:"remaining"`
}

type DateCursor struct {
Enabled *bool `config:"enabled"`
Field string `config:"field" validate:"required"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *Template `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
}

type Template struct {
*template.Template
}

func (t *Template) Unpack(in string) error {
tpl, err := template.New("tpl").Parse(in)
if err != nil {
return err
}

*t = Template{Template: tpl}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) GetDateFormat() string {
if dc.DateFormat == "" {
return time.RFC3339
}
return dc.DateFormat
}

func (dc *DateCursor) Validate() error {
if dc.DateFormat == "" {
return nil
}
now := time.Now().Format(dc.DateFormat)
if _, err := time.Parse(dc.DateFormat, now); err != nil {
return errors.New("invalid configuration: date_format is not a valid date layout")
}
return nil
}

func (c *config) Validate() error {
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
switch strings.ToUpper(c.HTTPMethod) {
case "GET", "POST":
Expand All @@ -81,6 +131,9 @@ func (c *config) Validate() error {
}
}
if c.Pagination != nil {
if c.DateCursor.IsEnabled() {
return errors.Errorf("invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms")
}
if c.Pagination.Header != nil {
if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 {
return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
Expand Down
27 changes: 27 additions & 0 deletions x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/pkg/errors"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -350,6 +351,32 @@ func TestConfigOauth2Validation(t *testing.T) {
"url": "localhost",
},
},
{
name: "date_cursor must fail in combination with pagination",
expectedErr: "invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms accessing config",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo"},
"pagination": map[string]interface{}{
"header": map[string]interface{}{"field_name": "foo", "regex_pattern": "bar"},
},
"url": "localhost",
},
},
{
name: "date_cursor.date_format will fail if invalid",
expectedErr: "invalid configuration: date_format is not a valid date layout accessing 'date_cursor'",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": "1234"},
"url": "localhost",
},
},
{
name: "date_cursor must work with a valid date_format",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": time.RFC3339},
"url": "localhost",
},
},
}

for _, c := range cases {
Expand Down
48 changes: 48 additions & 0 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
Expand Down Expand Up @@ -727,3 +728,50 @@ func TestArrayWithSplitResponse(t *testing.T) {
}
})
}

func TestCursor(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"date_cursor.field": "@timestamp",
"date_cursor.url_field": "$filter",
"date_cursor.value_template": "alertCreationTime ge {{.}}",
"date_cursor.initial_interval": "10m",
"date_cursor.date_format": "2006-01-02T15:04:05Z",
}

timeNow = func() time.Time {
t, _ := time.Parse("2006-01-02T15:04:05Z", "2002-10-02T15:10:00Z")
return t
}

const (
expectedQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z"
expectedNextCursorValue = "2002-10-02T15:00:01Z"
expectedNextQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A01Z"
)
var gotQuery string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
gotQuery = r.URL.Query().Encode()
w.Write([]byte(`[{"@timestamp":"2002-10-02T15:00:00Z"},{"@timestamp":"2002-10-02T15:00:01Z"}]`))
}))

runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(2)
if !ok {
t.Fatalf("Expected 2 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}

assert.Equal(t, expectedQuery, gotQuery)
assert.Equal(t, expectedNextCursorValue, input.nextCursorValue)
assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL())
})
}
73 changes: 69 additions & 4 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"sync"
Expand All @@ -37,6 +38,9 @@ const (

var userAgent = useragent.UserAgent("Filebeat")

// for testing
var timeNow = time.Now
marc-gr marked this conversation as resolved.
Show resolved Hide resolved

func init() {
err := input.Register(inputName, NewInput)
if err != nil {
Expand All @@ -55,6 +59,8 @@ type HttpjsonInput struct {
workerCancel context.CancelFunc // Used to signal that the worker should stop.
workerOnce sync.Once // Guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // Waits on worker goroutine.

nextCursorValue string
}

// RequestInfo struct has the information for generating an HTTP request
Expand Down Expand Up @@ -343,6 +349,7 @@ func createRequestInfoFromBody(m common.MapStr, idField string, requestField str

// processHTTPRequest processes HTTP request, and handles pagination if enabled
func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error {
ri.URL = in.getURL()
for {
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
Expand Down Expand Up @@ -407,8 +414,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
in.log.Debug("http.response.body is not a valid JSON object", string(responseData))
return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj)
}

if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() {
if mm != nil && in.config.Pagination.IsEnabled() {
if in.config.Pagination.Header != nil {
// Pagination control using HTTP Header
url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern)
Expand All @@ -427,7 +433,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
continue
} else {
// Pagination control using HTTP Body fields
ri, err := createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri)
ri, err = createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri)
if err != nil {
return err
}
Expand All @@ -441,10 +447,70 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
continue
}
}
if mm != nil && in.config.DateCursor.IsEnabled() {
in.advanceCursor(common.MapStr(mm))
}
return nil
}
}

func (in *HttpjsonInput) getURL() string {
if !in.config.DateCursor.IsEnabled() {
return in.config.URL
}

var dateStr string
if in.nextCursorValue == "" {
t := timeNow().UTC().Add(-in.config.DateCursor.InitialInterval)
dateStr = t.Format(in.config.DateCursor.GetDateFormat())
} else {
dateStr = in.nextCursorValue
}

url, err := url.Parse(in.config.URL)
if err != nil {
return in.config.URL
}

q := url.Query()

var value string
if in.config.DateCursor.ValueTemplate == nil {
value = dateStr
} else {
buf := new(bytes.Buffer)
if err := in.config.DateCursor.ValueTemplate.Execute(buf, dateStr); err != nil {
return in.config.URL
}
value = buf.String()
}

q.Set(in.config.DateCursor.URLField, value)

url.RawQuery = q.Encode()

return url.String()
}

func (in *HttpjsonInput) advanceCursor(m common.MapStr) {
v, err := m.GetValue(in.config.DateCursor.Field)
if err != nil {
in.log.Warnf("date_cursor field: %q", err)
return
}
switch t := v.(type) {
case string:
_, err := time.Parse(in.config.DateCursor.GetDateFormat(), t)
if err != nil {
return
}
in.nextCursorValue = t
default:
in.log.Warn("date_cursor field must be a string, cursor will not advance")
return
}
}

func (in *HttpjsonInput) run() error {
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()
Expand All @@ -455,7 +521,6 @@ func (in *HttpjsonInput) run() error {
}

ri := &RequestInfo{
URL: in.URL,
ContentMap: common.MapStr{},
Headers: in.HTTPHeaders,
}
Expand Down