diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5bc854185af..1778eace862 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -531,7 +531,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add event.ingested to all Filebeat modules. {pull}20386[20386] - Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450] - Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455] - +- Convert httpjson to v2 input {pull}20226[20226] *Heartbeat* diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 3cc9adb51d0..78262d08012 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,7 +11,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/s3" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq" diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index da27367a109..1fe245b80f7 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" ) @@ -26,6 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 return []v2.Plugin{ cloudfoundry.Plugin(), http_endpoint.Plugin(), + httpjson.Plugin(), o365audit.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 2be0eb6f211..95ca205be0d 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -5,13 +5,14 @@ package httpjson import ( + "errors" + "fmt" + "net/url" "regexp" "strings" "text/template" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) @@ -35,7 +36,7 @@ type config struct { RetryWaitMin time.Duration `config:"retry.wait_min"` RetryWaitMax time.Duration `config:"retry.wait_max"` TLS *tlscommon.Config `config:"ssl"` - URL string `config:"url" validate:"required"` + URL *URL `config:"url" validate:"required"` DateCursor *DateCursor `config:"date_cursor"` } @@ -92,6 +93,21 @@ func (t *Template) Unpack(in string) error { return nil } +type URL struct { + *url.URL +} + +func (u *URL) Unpack(in string) error { + parsed, err := url.Parse(in) + if err != nil { + return err + } + + *u = URL{URL: parsed} + + return nil +} + // IsEnabled returns true if the `enable` field is set to true in the yaml. func (dc *DateCursor) IsEnabled() bool { return dc != nil && (dc.Enabled == nil || *dc.Enabled) @@ -121,26 +137,26 @@ func (c *config) Validate() error { case "GET", "POST": break default: - return errors.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod) + return fmt.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod) } if c.NoHTTPBody { if len(c.HTTPRequestBody) > 0 { - return errors.Errorf("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously") + return errors.New("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously") } if c.Pagination != nil && (len(c.Pagination.ExtraBodyContent) > 0 || c.Pagination.RequestField != "") { - return errors.Errorf("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously") + return errors.New("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously") } } if c.Pagination != nil { if c.Pagination.Header != nil { if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 { - return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously") + return errors.New("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously") } } } if c.OAuth2.IsEnabled() { if c.APIKey != "" || c.AuthenticationScheme != "" { - return errors.Errorf("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously") + return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously") } } return nil diff --git a/x-pack/filebeat/input/httpjson/config_oauth.go b/x-pack/filebeat/input/httpjson/config_oauth.go index 6a09cf2fb92..0ff55dcbc33 100644 --- a/x-pack/filebeat/input/httpjson/config_oauth.go +++ b/x-pack/filebeat/input/httpjson/config_oauth.go @@ -7,13 +7,13 @@ package httpjson import ( "context" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" "os" "strings" - "github.com/pkg/errors" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" "golang.org/x/oauth2/endpoints" diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index c3486aedda4..0de07311239 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -6,11 +6,12 @@ package httpjson import ( "context" + "errors" "os" "testing" "time" - "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "golang.org/x/oauth2/google" "github.com/elastic/beats/v7/libbeat/common" @@ -110,6 +111,16 @@ func TestConfigValidationCase7(t *testing.T) { } } +func TestConfigMustFailWithInvalidURL(t *testing.T) { + m := map[string]interface{}{ + "url": "::invalid::", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + err := cfg.Unpack(&conf) + assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`) +} + func TestConfigOauth2Validation(t *testing.T) { cases := []struct { name string diff --git a/x-pack/filebeat/input/httpjson/date_cursor.go b/x-pack/filebeat/input/httpjson/date_cursor.go new file mode 100644 index 00000000000..2a9db44bd2a --- /dev/null +++ b/x-pack/filebeat/input/httpjson/date_cursor.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "bytes" + "net/url" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type dateCursor struct { + log *logp.Logger + enabled bool + field string + url url.URL + urlField string + initialInterval time.Duration + dateFormat string + + value string + valueTpl *Template +} + +func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { + c := &dateCursor{ + enabled: config.DateCursor.IsEnabled(), + url: *config.URL.URL, + } + + if !c.enabled { + return c + } + + c.log = log + c.field = config.DateCursor.Field + c.urlField = config.DateCursor.URLField + c.initialInterval = config.DateCursor.InitialInterval + c.dateFormat = config.DateCursor.GetDateFormat() + c.valueTpl = config.DateCursor.ValueTemplate + + return c +} + +func (c *dateCursor) getURL() string { + if !c.enabled { + return c.url.String() + } + + var dateStr string + if c.value == "" { + t := timeNow().UTC().Add(-c.initialInterval) + dateStr = t.Format(c.dateFormat) + } else { + dateStr = c.value + } + + q := c.url.Query() + + var value string + if c.valueTpl == nil { + value = dateStr + } else { + buf := new(bytes.Buffer) + if err := c.valueTpl.Template.Execute(buf, dateStr); err != nil { + return c.url.String() + } + value = buf.String() + } + + q.Set(c.urlField, value) + + c.url.RawQuery = q.Encode() + + return c.url.String() +} + +func (c *dateCursor) advance(m common.MapStr) { + if c.field == "" { + c.value = time.Now().UTC().Format(c.dateFormat) + return + } + + v, err := m.GetValue(c.field) + if err != nil { + c.log.Warnf("date_cursor field: %q", err) + return + } + switch t := v.(type) { + case string: + _, err := time.Parse(c.dateFormat, t) + if err != nil { + c.log.Warn("date_cursor field does not have the expected layout") + return + } + c.value = t + default: + c.log.Warn("date_cursor field must be a string, cursor will not advance") + return + } +} diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index a6ebd16ad5d..b541c16002e 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -6,783 +6,427 @@ package httpjson import ( "context" - "encoding/json" "fmt" "io/ioutil" - "log" "math/rand" "net/http" "net/http/httptest" - "reflect" - "regexp" - "strconv" - "sync" "testing" "time" - "golang.org/x/sync/errgroup" - "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" ) -const ( - HTTPTestServer int = iota - TLSTestServer - RateLimitRetryServer - ErrorRetryServer - ArrayResponseServer -) - -var ( - once sync.Once -) - -func testSetup(t *testing.T) { - t.Helper() - once.Do(func() { - logp.TestingSetup() - }) -} - -func createTestServer(testServer int) *httptest.Server { - var ts *httptest.Server - newServer := httptest.NewServer - switch testServer { - case HTTPTestServer: - ts = createServer(newServer) - case TLSTestServer: - ts = createServer(httptest.NewTLSServer) - case RateLimitRetryServer: - ts = createCustomServer(newServer) - case ErrorRetryServer: - ts = createCustomRetryServer(newServer) - case ArrayResponseServer: - ts = createCustomServerWithArrayResponse(newServer) - default: - ts = createServer(newServer) - } - return ts -} - -func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { - return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPost { - req, err := ioutil.ReadAll(r.Body) - defer r.Body.Close() - if err != nil { - log.Fatalln(err) - } - var m interface{} - err = json.Unmarshal(req, &m) - w.Header().Set("Content-Type", "application/json") - if err != nil { - w.WriteHeader(http.StatusBadRequest) - } else { - w.WriteHeader(http.StatusOK) - w.Write(req) - } - } else { - message := map[string]interface{}{ - "hello": "world", - "embedded": map[string]string{ - "hello": "world", - }, - "list": []map[string]interface{}{ - {"foo": "bar"}, - {"hello": "world"}, - }, - } - b, _ := json.Marshal(message) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write(b) - } - })) -} - -func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { - var isRetry bool - return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - if !isRetry { - w.Header().Set("X-Rate-Limit-Limit", "0") - w.Header().Set("X-Rate-Limit-Remaining", "0") - w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10)) - w.WriteHeader(http.StatusTooManyRequests) - w.Write([]byte{}) - isRetry = true - } else { - message := map[string]interface{}{ - "hello": "world", - "embedded": map[string]string{ - "hello": "world", - }, - } - b, _ := json.Marshal(message) - w.WriteHeader(http.StatusOK) - w.Write(b) - isRetry = false - } - })) -} - -func createCustomRetryServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { - retryCount := 0 - statusCodes := []int{http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout, http.StatusHTTPVersionNotSupported, http.StatusVariantAlsoNegotiates, http.StatusInsufficientStorage, http.StatusLoopDetected, http.StatusNotExtended, http.StatusNetworkAuthenticationRequired} - return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - // Test retry for two times - if retryCount < 2 { - rand.Seed(time.Now().Unix()) - code := statusCodes[rand.Intn(len(statusCodes))] - w.WriteHeader(code) - w.Write([]byte{}) - retryCount++ - } else { - message := map[string]interface{}{ - "hello": "world", - "embedded": map[string]string{ - "hello": "world", - }, - } - b, _ := json.Marshal(message) - w.WriteHeader(http.StatusOK) - w.Write(b) - retryCount = 0 - } - })) -} - -func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *httptest.Server) *httptest.Server { - return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - message := map[string]interface{}{ - "hello": []map[string]interface{}{ - { - "foo": "bar", - "list": []map[string]interface{}{ - {"foo": "bar"}, - {"hello": "world"}, - }, - }, - { - "foo": "bar", - "list": []map[string]interface{}{ - {"foo": "bar"}, - }, +func TestHTTPJSONInput(t *testing.T) { + testCases := []struct { + name string + setupServer func(*testing.T, http.HandlerFunc, map[string]interface{}) + baseConfig map[string]interface{} + handler http.HandlerFunc + expected []string + }{ + { + name: "Test simple GET request", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + }, + handler: defaultHandler("GET", ""), + expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`}, + }, + { + name: "Test simple HTTPS GET request", + setupServer: newTestServer(httptest.NewTLSServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "ssl.verification_mode": "none", + }, + handler: defaultHandler("GET", ""), + expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`}, + }, + { + name: "Test request honors rate limit", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "rate_limit.limit": "X-Rate-Limit-Limit", + "rate_limit.remaining": "X-Rate-Limit-Remaining", + "rate_limit.reset": "X-Rate-Limit-Reset", + }, + handler: rateLimitHandler(), + expected: []string{`{"hello":"world"}`}, + }, + { + name: "Test request retries when failed", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + }, + handler: retryHandler(), + expected: []string{`{"hello":"world"}`}, + }, + { + name: "Test POST request with body", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "POST", + "interval": 0, + "http_request_body": map[string]interface{}{ + "test": "abc", }, - { - "bar": "foo", - "list": []map[string]interface{}{}, + }, + handler: defaultHandler("POST", `{"test":"abc"}`), + expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`}, + }, + { + name: "Test repeated POST requests", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "POST", + "interval": "100ms", + }, + handler: defaultHandler("POST", ""), + expected: []string{ + `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`, + `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`, + }, + }, + { + name: "Test json objects array", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "json_objects_array": "hello", + }, + handler: defaultHandler("GET", ""), + expected: []string{`{"world":"moon"}`, `{"space":[{"cake":"pumpkin"}]}`}, + }, + { + name: "Test split events by", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "split_events_by": "hello", + }, + handler: defaultHandler("GET", ""), + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"hello":{"space":[{"cake":"pumpkin"}]}}`, + }, + }, + { + name: "Test split events by with array", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "split_events_by": "space", + "json_objects_array": "hello", + }, + handler: defaultHandler("GET", ""), + expected: []string{ + `{"world":"moon"}`, + `{"space":{"cake":"pumpkin"}}`, + }, + }, + { + name: "Test split events by not found", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "split_events_by": "unknwown", + }, + handler: defaultHandler("GET", ""), + expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`}, + }, + { + name: "Test date cursor", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + // mock timeNow func to return a fixed value + timeNow = func() time.Time { + t, _ := time.Parse(time.RFC3339, "2002-10-02T15:00:00Z") + return t + } + + server := httptest.NewServer(h) + config["url"] = server.URL + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": "100ms", + "date_cursor.field": "@timestamp", + "date_cursor.url_field": "$filter", + "date_cursor.value_template": "alertCreationTime ge {{.}}", + "date_cursor.initial_interval": "10m", + "date_cursor.date_format": "2006-01-02T15:04:05Z", + }, + handler: dateCursorHandler(), + expected: []string{ + `{"@timestamp":"2002-10-02T15:00:00Z","foo":"bar"}`, + `{"@timestamp":"2002-10-02T15:00:01Z","foo":"bar"}`, + `{"@timestamp":"2002-10-02T15:00:02Z","foo":"bar"}`, + }, + }, + { + name: "Test pagination", + setupServer: newTestServer(httptest.NewServer), + baseConfig: map[string]interface{}{ + "http_method": "GET", + "interval": 0, + "pagination.id_field": "nextPageToken", + "pagination.url_field": "page", + "json_objects_array": "items", + }, + handler: paginationHandler(), + expected: []string{`{"foo":"bar"}`, `{"foo":"bar"}`}, + }, + { + name: "Test oauth2", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + server := httptest.NewServer(h) + config["url"] = server.URL + config["oauth2.token_url"] = server.URL + "/token" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "http_method": "POST", + "interval": "0", + "oauth2.client.id": "a_client_id", + "oauth2.client.secret": "a_client_secret", + "oauth2.endpoint_params": map[string]interface{}{ + "param1": "v1", }, - {"bar": "foo"}, + "oauth2.scopes": []string{"scope1", "scope2"}, }, - } - b, _ := json.Marshal(message) - w.WriteHeader(http.StatusOK) - w.Write(b) - })) -} - -func runTest(t *testing.T, ts *httptest.Server, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { - testSetup(t) - defer ts.Close() - m["url"] = ts.URL - cfg := common.MustNewConfigFrom(m) - // Simulate input.Context from Filebeat input runner. - inputCtx := newInputContext() - defer close(inputCtx.Done) - - // Stub outlet for receiving events generated by the input. - eventOutlet := newStubOutlet() - defer eventOutlet.Close() - - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { - return eventOutlet, nil - }) - - in, err := NewInput(cfg, connector, inputCtx) - if err != nil { - t.Fatal(err) - } - input := in.(*HttpjsonInput) - defer input.Stop() - - run(input, eventOutlet, t) -} - -func newInputContext() input.Context { - return input.Context{ - Done: make(chan struct{}), - } -} - -type stubOutleter struct { - sync.Mutex - cond *sync.Cond - done bool - Events []beat.Event -} - -func newStubOutlet() *stubOutleter { - o := &stubOutleter{} - o.cond = sync.NewCond(o) - return o -} - -func (o *stubOutleter) waitForEvents(numEvents int) ([]beat.Event, bool) { - o.Lock() - defer o.Unlock() - - for len(o.Events) < numEvents && !o.done { - o.cond.Wait() - } - - size := numEvents - if size >= len(o.Events) { - size = len(o.Events) - } - - out := make([]beat.Event, size) - copy(out, o.Events) - return out, len(out) == numEvents -} - -func (o *stubOutleter) Close() error { - o.Lock() - defer o.Unlock() - o.done = true - return nil -} - -func (o *stubOutleter) Done() <-chan struct{} { return nil } - -func (o *stubOutleter) OnEvent(event beat.Event) bool { - o.Lock() - defer o.Unlock() - o.Events = append(o.Events, event) - o.cond.Broadcast() - return !o.done -} - -func newOAuth2TestServer(t *testing.T) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - if r.Method != "POST" { - t.Errorf("expected POST request, got %v", r.Method) - return - } - - if err := r.ParseForm(); err != nil { - t.Errorf("no error expected, got %q", err) - return - } - - if gt := r.FormValue("grant_type"); gt != "client_credentials" { - t.Errorf("expected grant_type was client_credentials, got %q", gt) - return - } - - clientID := r.FormValue("client_id") - clientSecret := r.FormValue("client_secret") - if clientID == "" || clientSecret == "" { - clientID, clientSecret, _ = r.BasicAuth() - } - if clientID != "a_client_id" || clientSecret != "a_client_secret" { - t.Errorf("expected client credentials \"a_client_id:a_client_secret\", got \"%s:%s\"", clientID, clientSecret) - } - - if s := r.FormValue("scope"); s != "scope1 scope2" { - t.Errorf("expected scope was scope1+scope2, got %q", s) - return - } - - expectedParams := []string{"v1", "v2"} - if p := r.Form["param1"]; !reflect.DeepEqual(expectedParams, p) { - t.Errorf("expected params were %q, but got %q", expectedParams, p) - return - } - - w.Header().Set("content-type", "application/json") - w.Write([]byte(`{"token_type":"Bearer","expires_in":"3599","access_token":"abcdef1234567890"}`)) - })) -} - -// --- Test Cases - -func TestGetNextLinkFromHeader(t *testing.T) { - header := make(http.Header) - header.Add("Link", "; rel=\"self\"") - header.Add("Link", "; rel=\"next\"") - re, _ := regexp.Compile("<([^>]+)>; *rel=\"next\"(?:,|$)") - url, err := getNextLinkFromHeader(header, "Link", re) - if url != "https://dev-168980.okta.com/api/v1/logs?after=1581658181086_1" { - t.Fatal("Failed to test getNextLinkFromHeader. URL " + url + " is not expected") - } - if err != nil { - t.Fatal("Failed to test getNextLinkFromHeader with error:", err) - } -} - -func TestCreateRequestInfoFromBody(t *testing.T) { - m := map[string]interface{}{ - "id": 100, - } - extraBodyContent := common.MapStr{"extra_body": "abc"} - config := &Pagination{ - IDField: "id", - RequestField: "pagination_id", - ExtraBodyContent: extraBodyContent, - URL: "https://test-123", - } - ri, err := createRequestInfoFromBody( - config, - common.MapStr(m), - common.MapStr(m), - &RequestInfo{ - URL: "", - ContentMap: common.MapStr{}, - Headers: common.MapStr{}, + handler: oauth2Handler, + expected: []string{`{"hello": "world"}`}, }, - ) - if ri.URL != "https://test-123" { - t.Fatal("Failed to test createRequestInfoFromBody. URL should be https://test-123.") - } - p, err := ri.ContentMap.GetValue("pagination_id") - if err != nil { - t.Fatal("Failed to test createRequestInfoFromBody with error", err) - } - switch pt := p.(type) { - case int: - if pt != 100 { - t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %d should be 100.", pt) - } - default: - t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %T should be int.", pt) - } - b, err := ri.ContentMap.GetValue("extra_body") - if err != nil { - t.Fatal("Failed to test createRequestInfoFromBody with error", err) - } - switch bt := b.(type) { - case string: - if bt != "abc" { - t.Fatalf("Failed to test createRequestInfoFromBody. extra_body value %s does not match \"abc\".", bt) - } - default: - t.Fatalf("Failed to test createRequestInfoFromBody. extra_body type %T should be string.", bt) } -} -// Test getRateLimit function with a remaining quota, expect to receive 0, nil. -func TestGetRateLimitCase1(t *testing.T) { - header := make(http.Header) - header.Add("X-Rate-Limit-Limit", "120") - header.Add("X-Rate-Limit-Remaining", "118") - header.Add("X-Rate-Limit-Reset", "1581658643") - rateLimit := &RateLimit{ - Limit: "X-Rate-Limit-Limit", - Reset: "X-Rate-Limit-Reset", - Remaining: "X-Rate-Limit-Remaining", - } - epoch, err := getRateLimit(header, rateLimit) - if err != nil || epoch != 0 { - t.Fatal("Failed to test getRateLimit.") - } -} - -// Test getRateLimit function with a past time, expect to receive 0, nil. -func TestGetRateLimitCase2(t *testing.T) { - header := make(http.Header) - header.Add("X-Rate-Limit-Limit", "10") - header.Add("X-Rate-Limit-Remaining", "0") - header.Add("X-Rate-Limit-Reset", "1581658643") - rateLimit := &RateLimit{ - Limit: "X-Rate-Limit-Limit", - Reset: "X-Rate-Limit-Reset", - Remaining: "X-Rate-Limit-Remaining", - } - epoch, err := getRateLimit(header, rateLimit) - if err != nil || epoch != 0 { - t.Fatal("Failed to test getRateLimit.") + for _, testCase := range testCases { + tc := testCase + t.Run(tc.name, func(t *testing.T) { + tc.setupServer(t, tc.handler, tc.baseConfig) + + cfg := common.MustNewConfigFrom(tc.baseConfig) + + input, err := configure(cfg) + + assert.NoError(t, err) + assert.Equal(t, "httpjson", input.Name()) + assert.NoError(t, input.Test(v2.TestContext{})) + + pub := beattest.NewChanClient(len(tc.expected)) + t.Cleanup(func() { _ = pub.Close() }) + + ctx, cancel := newV2Context() + t.Cleanup(cancel) + + var g errgroup.Group + g.Go(func() error { return input.Run(ctx, pub) }) + + timeout := time.NewTimer(5 * time.Second) + t.Cleanup(func() { _ = timeout.Stop() }) + + var receivedCount int + wait: + for { + select { + case <-timeout.C: + t.Errorf("timed out waiting for %d events", len(tc.expected)) + return + case got := <-pub.Channel: + val, err := got.Fields.GetValue("message") + assert.NoError(t, err) + assert.JSONEq(t, tc.expected[receivedCount], val.(string)) + receivedCount += 1 + if receivedCount == len(tc.expected) { + cancel() + break wait + } + } + } + assert.NoError(t, g.Wait()) + }) } } -// Test getRateLimit function with a time yet to come, expect to receive , nil. -func TestGetRateLimitCase3(t *testing.T) { - epoch := time.Now().Unix() + 100 - header := make(http.Header) - header.Add("X-Rate-Limit-Limit", "10") - header.Add("X-Rate-Limit-Remaining", "0") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10)) - rateLimit := &RateLimit{ - Limit: "X-Rate-Limit-Limit", - Reset: "X-Rate-Limit-Reset", - Remaining: "X-Rate-Limit-Remaining", - } - epoch2, err := getRateLimit(header, rateLimit) - if err != nil || epoch2 != epoch { - t.Fatal("Failed to test getRateLimit.") +func newTestServer( + newServer func(http.Handler) *httptest.Server, +) func(*testing.T, http.HandlerFunc, map[string]interface{}) { + return func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + server := newServer(h) + config["url"] = server.URL + t.Cleanup(server.Close) } } -func TestGET(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "interval": 0, - } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger("httpjson_test"), + ID: "test_id", + Cancelation: ctx, + }, cancel } -func TestGetHTTPS(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "interval": 0, - "ssl.verification_mode": "none", - } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) +func defaultHandler(expectedMethod, expectedBody string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + msg := `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}` + switch { + case r.Method != expectedMethod: + w.WriteHeader(http.StatusBadRequest) + msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod) + case expectedBody != "": + body, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + if expectedBody != string(body) { + w.WriteHeader(http.StatusBadRequest) + msg = fmt.Sprintf(`{"error":"expected body was %q"}`, expectedBody) + } } - }) -} -func TestRateLimitRetry(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "interval": 0, + _, _ = w.Write([]byte(msg)) } - ts := createTestServer(RateLimitRetryServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) } -func TestErrorRetry(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "interval": 0, - } - ts := createTestServer(ErrorRetryServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) +func rateLimitHandler() http.HandlerFunc { + var isRetry bool + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + if isRetry { + _, _ = w.Write([]byte(`{"hello":"world"}`)) + return } - }) -} - -func TestArrayResponse(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "json_objects_array": "hello", - "interval": 0, + w.Header().Set("X-Rate-Limit-Limit", "0") + w.Header().Set("X-Rate-Limit-Remaining", "0") + w.Header().Set("X-Rate-Limit-Reset", fmt.Sprint(time.Now().Unix())) + w.WriteHeader(http.StatusTooManyRequests) + isRetry = true + _, _ = w.Write([]byte(`{"error":"too many requests"}`)) } - ts := createTestServer(ArrayResponseServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(2) - if !ok { - t.Fatalf("Expected 2 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) } -func TestPOST(t *testing.T) { - m := map[string]interface{}{ - "http_method": "POST", - "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, - "interval": 0, - } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) -} - -func TestRepeatedPOST(t *testing.T) { - m := map[string]interface{}{ - "http_method": "POST", - "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, - "interval": 10 ^ 9, - } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(3) - if !ok { - t.Fatalf("Expected 3 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) +func retryHandler() http.HandlerFunc { + count := 0 + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + if count == 2 { + _, _ = w.Write([]byte(`{"hello":"world"}`)) + return } - }) -} - -func TestRunStop(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "interval": 0, + w.WriteHeader(rand.Intn(100) + 500) + count += 1 + } +} + +func oauth2TokenHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + _ = r.ParseForm() + switch { + case r.Method != "POST": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong method"}`)) + case r.FormValue("grant_type") != "client_credentials": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong grant_type"}`)) + case r.FormValue("client_id") != "a_client_id": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong client_id"}`)) + case r.FormValue("client_secret") != "a_client_secret": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong client_secret"}`)) + case r.FormValue("scope") != "scope1 scope2": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong scope"}`)) + case r.FormValue("param1") != "v1": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong param1"}`)) + default: + _, _ = w.Write([]byte(`{"token_type": "Bearer", "expires_in": "60", "access_token": "abcd"}`)) } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - input.Run() - input.Stop() - input.Run() - input.Stop() - }) } -func TestOAuth2(t *testing.T) { - oAuth2Server := newOAuth2TestServer(t) - defer oAuth2Server.Close() - ts := createTestServer(HTTPTestServer) - defer ts.Close() - m := map[string]interface{}{ - "http_method": "GET", - "oauth2.client.id": "a_client_id", - "oauth2.client.secret": "a_client_secret", - "oauth2.token_url": oAuth2Server.URL, - "oauth2.endpoint_params": map[string][]string{ - "param1": {"v1", "v2"}, - }, - "oauth2.scopes": []string{"scope1", "scope2"}, - "interval": 0, +func oauth2Handler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/token" { + oauth2TokenHandler(w, r) + return } - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) -} - -func TestSplitResponseWithKey(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "split_events_by": "list", - "interval": 0, + w.Header().Set("content-type", "application/json") + switch { + case r.Method != "POST": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong method"}`)) + case r.Header.Get("Authorization") != "Bearer abcd": + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong bearer"}`)) + default: + _, _ = w.Write([]byte(`{"hello":"world"}`)) } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(2) - if !ok { - t.Fatalf("Expected 2 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) } -func TestSplitResponseWithoutKey(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "split_events_by": "not_found", - "interval": 0, - } - ts := createTestServer(HTTPTestServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(1) - if !ok { - t.Fatalf("Expected 1 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - }) -} - -func TestArrayWithSplitResponse(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "json_objects_array": "hello", - "split_events_by": "list", - "interval": 0, - } - - expectedFields := []string{ - `{ - "foo": "bar", - "list": { - "foo": "bar" +func dateCursorHandler() http.HandlerFunc { + var count int + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + switch count { + case 0: + if r.URL.Query().Get("$filter") != "alertCreationTime ge 2002-10-02T14:50:00Z" { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong initial cursor value"`)) + return } - }`, - `{ - "foo": "bar", - "list": { - "hello": "world" + _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:00Z","foo":"bar"}`)) + case 1: + if r.URL.Query().Get("$filter") != "alertCreationTime ge 2002-10-02T15:00:00Z" { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong cursor value"`)) + return } - }`, - `{ - "foo": "bar", - "list": { - "foo": "bar" + _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:01Z","foo":"bar"}`)) + case 2: + if r.URL.Query().Get("$filter") != "alertCreationTime ge 2002-10-02T15:00:01Z" { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong cursor value"`)) + return } - }`, - `{ - "bar": "foo", - "list": [] - }`, - `{"bar": "foo"}`, - } - - ts := createTestServer(ArrayResponseServer) - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(5) - if !ok { - t.Fatalf("Expected 5 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) - } - - for i, e := range events { - message, _ := e.GetValue("message") - assert.JSONEq(t, expectedFields[i], message.(string)) + _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:02Z","foo":"bar"}`)) } - }) -} - -func TestCursor(t *testing.T) { - m := map[string]interface{}{ - "http_method": "GET", - "date_cursor.field": "@timestamp", - "date_cursor.url_field": "$filter", - "date_cursor.value_template": "alertCreationTime ge {{.}}", - "date_cursor.initial_interval": "10m", - "date_cursor.date_format": "2006-01-02T15:04:05Z", - } - - timeNow = func() time.Time { - t, _ := time.Parse("2006-01-02T15:04:05Z", "2002-10-02T15:10:00Z") - return t + count += 1 } +} - const ( - expectedQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z" - expectedNextCursorValue = "2002-10-02T15:00:01Z" - expectedNextQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A01Z" - ) - var gotQuery string - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - gotQuery = r.URL.Query().Encode() - w.Write([]byte(`[{"@timestamp":"2002-10-02T15:00:00Z"},{"@timestamp":"2002-10-02T15:00:01Z"}]`)) - })) - - runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { - group, _ := errgroup.WithContext(context.Background()) - group.Go(input.run) - - events, ok := out.waitForEvents(2) - if !ok { - t.Fatalf("Expected 2 events, but got %d.", len(events)) - } - input.Stop() - - if err := group.Wait(); err != nil { - t.Fatal(err) +func paginationHandler() http.HandlerFunc { + var count int + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + switch count { + case 0: + _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:00Z","nextPageToken":"bar","items":[{"foo":"bar"}]}`)) + case 1: + if r.URL.Query().Get("page") != "bar" { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"wrong page token value"}`)) + return + } + _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:01Z","items":[{"foo":"bar"}]}`)) } - - assert.Equal(t, expectedQuery, gotQuery) - assert.Equal(t, expectedNextCursorValue, input.nextCursorValue) - assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL()) - }) + count += 1 + } } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 50677876b1f..766fa364864 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -5,71 +5,37 @@ package httpjson import ( - "bytes" "context" - "encoding/json" "fmt" - "io" - "io/ioutil" "net" "net/http" - "net/url" - "regexp" - "strconv" - "sync" "time" - "github.com/pkg/errors" + "github.com/hashicorp/go-retryablehttp" + "go.uber.org/zap" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/common/useragent" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" - - "github.com/hashicorp/go-retryablehttp" - "go.uber.org/zap" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/timed" ) const ( inputName = "httpjson" ) -var userAgent = useragent.UserAgent("Filebeat") - -// for testing -var timeNow = time.Now - -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) - } -} - -// HttpjsonInput struct has the HttpJsonInput configuration and other userful info. -type HttpjsonInput struct { - config - log *logp.Logger - outlet channel.Outleter // Output of received messages. - inputCtx context.Context // Wraps the Done channel from parent input.Context. - - workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // Used to signal that the worker should stop. - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // Waits on worker goroutine. +var ( + userAgent = useragent.UserAgent("Filebeat") - nextCursorValue string -} - -// RequestInfo struct has the information for generating an HTTP request -type RequestInfo struct { - URL string - ContentMap common.MapStr - Headers common.MapStr -} + // for testing + timeNow = time.Now +) type retryLogger struct { log *logp.Logger @@ -81,532 +47,137 @@ func newRetryLogger() *retryLogger { } } -func (l *retryLogger) Printf(s string, args ...interface{}) { - l.log.Debugf(s, args...) +func (log *retryLogger) Error(format string, args ...interface{}) { + log.log.Errorf(format, args...) } -// NewInput creates a new httpjson input -func NewInput( - cfg *common.Config, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { - // Extract and validate the input's configuration. - conf := defaultConfig() - if err := cfg.Unpack(&conf); err != nil { - return nil, err - } - // Build outlet for events. - out, err := connector.Connect(cfg) - if err != nil { - return nil, err - } - - // Wrap input.Context's Done channel with a context.Context. This goroutine - // stops with the parent closes the Done channel. - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Done: - case <-inputCtx.Done(): - } - }() - - // If the input ever needs to be made restartable, then context would need - // to be recreated with each restart. - workerCtx, workerCancel := context.WithCancel(inputCtx) - - in := &HttpjsonInput{ - config: conf, - log: logp.NewLogger("httpjson").With( - "url", conf.URL), - outlet: out, - inputCtx: inputCtx, - workerCtx: workerCtx, - workerCancel: workerCancel, - } - - in.log.Info("Initialized httpjson input.") - return in, nil +func (log *retryLogger) Info(format string, args ...interface{}) { + log.log.Infof(format, args...) } -// Run starts the input worker then returns. Only the first invocation -// will ever start the worker. -func (in *HttpjsonInput) Run() { - in.workerOnce.Do(func() { - in.workerWg.Add(1) - go func() { - in.log.Info("httpjson input worker has started.") - defer in.log.Info("httpjson input worker has stopped.") - defer in.workerWg.Done() - defer in.workerCancel() - if err := in.run(); err != nil { - in.log.Error(err) - return - } - }() - }) +func (log *retryLogger) Debug(format string, args ...interface{}) { + log.log.Debugf(format, args...) } -// createHTTPRequest creates an HTTP/HTTPs request for the input -func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) (*http.Request, error) { - var body io.Reader - if len(ri.ContentMap) == 0 || in.config.NoHTTPBody { - body = nil - } else { - b, err := json.Marshal(ri.ContentMap) - if err != nil { - return nil, err - } - body = bytes.NewReader(b) - } - req, err := http.NewRequest(in.config.HTTPMethod, ri.URL, body) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - req.Header.Set("Accept", "application/json") - req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", userAgent) - if in.config.APIKey != "" { - if in.config.AuthenticationScheme != "" { - req.Header.Set("Authorization", in.config.AuthenticationScheme+" "+in.config.APIKey) - } else { - req.Header.Set("Authorization", in.config.APIKey) - } - } - for k, v := range ri.Headers { - switch vv := v.(type) { - case string: - req.Header.Set(k, vv) - default: - } - } - return req, nil +func (log *retryLogger) Warn(format string, args ...interface{}) { + log.log.Warnf(format, args...) } -// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. -func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { - var last map[string]interface{} - for _, t := range events { - switch v := t.(type) { - case map[string]interface{}: - for _, e := range in.splitEvent(v) { - last = e - d, err := json.Marshal(e) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal %+v", e) - } - ok := in.outlet.OnEvent(makeEvent(string(d))) - if !ok { - return nil, errors.New("function OnEvent returned false") - } - } - default: - return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) - } - } - return last, nil +type httpJSONInput struct { + config config + tlsConfig *tlscommon.TLSConfig } -func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} { - m := common.MapStr(event) - - hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy) - if in.config.SplitEventsBy == "" || !hasSplitKey { - return []map[string]interface{}{event} - } - - splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy) - splitOn, ok := splitOnIfc.([]interface{}) - // if not an array or is empty, we do nothing - if !ok || len(splitOn) == 0 { - return []map[string]interface{}{event} - } - - var events []map[string]interface{} - for _, split := range splitOn { - s, ok := split.(map[string]interface{}) - // if not an object, we do nothing - if !ok { - return []map[string]interface{}{event} - } - - mm := m.Clone() - _, err := mm.Put(in.config.SplitEventsBy, s) - if err != nil { - return []map[string]interface{}{event} - } - - events = append(events, mm) +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Beta, + Deprecated: false, + Manager: stateless.NewInputManager(configure), } - - return events } -// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response -func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) { - links, ok := header[fieldName] - if !ok { - return "", errors.Errorf("field %s does not exist in the HTTP Header", fieldName) - } - for _, link := range links { - matchArray := re.FindAllStringSubmatch(link, -1) - if len(matchArray) == 1 { - return matchArray[0][1], nil - } +func configure(cfg *common.Config) (stateless.Input, error) { + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, err } - return "", nil -} -// getRateLimit get the rate limit value if specified in the HTTP Header of the response, -// and returns an init64 value in seconds since unix epoch for rate limit reset time. -// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it -// returns 0 for the epoch value. -func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) { - if rateLimit != nil { - if rateLimit.Remaining != "" { - remaining := header.Get(rateLimit.Remaining) - if remaining == "" { - return 0, errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Remaining) - } - m, err := strconv.ParseInt(remaining, 10, 64) - if err != nil { - return 0, errors.Wrapf(err, "failed to parse rate-limit remaining value") - } - if m == 0 { - reset := header.Get(rateLimit.Reset) - if reset == "" { - return 0, errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Reset) - } - epoch, err := strconv.ParseInt(reset, 10, 64) - if err != nil { - return 0, errors.Wrapf(err, "failed to parse rate-limit reset value") - } - if time.Unix(epoch, 0).Sub(time.Now()) <= 0 { - return 0, nil - } - return epoch, nil - } - } - } - return 0, nil + return newHTTPJSONInput(conf) } -// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response -func (in *HttpjsonInput) applyRateLimit(ctx context.Context, header http.Header, rateLimit *RateLimit) error { - epoch, err := getRateLimit(header, rateLimit) - if err != nil { - return err - } - t := time.Unix(epoch, 0) - w := time.Until(t) - if epoch == 0 || w <= 0 { - in.log.Debugf("Rate Limit: No need to apply rate limit.") - return nil - } - in.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t) - ticker := time.NewTicker(w) - defer ticker.Stop() - select { - case <-ctx.Done(): - in.log.Info("Context done.") - return nil - case <-ticker.C: - in.log.Debug("Rate Limit: time is up.") - return nil - } -} - -// createRequestInfoFromBody creates a new RequestInfo for a new HTTP request in pagination based on HTTP response body -func createRequestInfoFromBody(config *Pagination, response, last common.MapStr, ri *RequestInfo) (*RequestInfo, error) { - // we try to get it from last element, if not found, from the original response - v, err := last.GetValue(config.IDField) - if err == common.ErrKeyNotFound { - v, err = response.GetValue(config.IDField) - } - - if err == common.ErrKeyNotFound { - return nil, nil +func newHTTPJSONInput(config config) (*httpJSONInput, error) { + if err := config.Validate(); err != nil { + return nil, err } + tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) if err != nil { - return nil, errors.Wrapf(err, "failed to retrieve id_field for pagination") + return nil, err } - if config.RequestField != "" { - ri.ContentMap.Put(config.RequestField, v) - if config.URL != "" { - ri.URL = config.URL - } - } else if config.URLField != "" { - url, err := url.Parse(ri.URL) - if err == nil { - q := url.Query() - q.Set(config.URLField, fmt.Sprint(v)) - url.RawQuery = q.Encode() - ri.URL = url.String() - } - } else { - switch vt := v.(type) { - case string: - ri.URL = vt - default: - return nil, errors.New("pagination ID is not of string type") - } - } - if len(config.ExtraBodyContent) > 0 { - ri.ContentMap.Update(common.MapStr(config.ExtraBodyContent)) - } - return ri, nil + return &httpJSONInput{ + config: config, + tlsConfig: tlsConfig, + }, nil } -// processHTTPRequest processes HTTP request, and handles pagination if enabled -func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error { - ri.URL = in.getURL() - - var ( - m, v interface{} - response, mm map[string]interface{} - ) +func (*httpJSONInput) Name() string { return inputName } - for { - req, err := in.createHTTPRequest(ctx, ri) - if err != nil { - return errors.Wrapf(err, "failed to create http request") +func (in *httpJSONInput) Test(v2.TestContext) error { + port := func() string { + if in.config.URL.Port() != "" { + return in.config.URL.Port() } - msg, err := client.Do(req) - if err != nil { - return errors.Wrapf(err, "failed to execute http client.Do") - } - responseData, err := ioutil.ReadAll(msg.Body) - header := msg.Header - msg.Body.Close() - if err != nil { - return errors.Wrapf(err, "failed to read http.response.body") - } - if msg.StatusCode != http.StatusOK { - in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData)) - if msg.StatusCode == http.StatusTooManyRequests { - if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { - return err - } - continue - } - return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode) - } - - err = json.Unmarshal(responseData, &m) - if err != nil { - in.log.Debug("failed to unmarshal http.response.body", string(responseData)) - return errors.Wrapf(err, "failed to unmarshal http.response.body") + switch in.config.URL.Scheme { + case "https": + return "443" } - switch obj := m.(type) { - // Top level Array - case []interface{}: - mm, err = in.processEventArray(obj) - if err != nil { - return err - } - case map[string]interface{}: - response = obj - if in.config.JSONObjects == "" { - mm, err = in.processEventArray([]interface{}{obj}) - if err != nil { - return err - } - } else { - v, err = common.MapStr(obj).GetValue(in.config.JSONObjects) - if err != nil { - if err == common.ErrKeyNotFound { - break - } - return err - } - switch ts := v.(type) { - case []interface{}: - mm, err = in.processEventArray(ts) - if err != nil { - return err - } - default: - return errors.Errorf("content of %s is not a valid array", in.config.JSONObjects) - } - } - default: - in.log.Debug("http.response.body is not a valid JSON object", string(responseData)) - return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj) - } - - if mm != nil && in.config.Pagination.IsEnabled() { - if in.config.Pagination.Header != nil { - // Pagination control using HTTP Header - url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern) - if err != nil { - return errors.Wrapf(err, "failed to retrieve the next URL for pagination") - } - if ri.URL == url || url == "" { - in.log.Info("Pagination finished.") - break - } - ri.URL = url - if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { - return err - } - in.log.Info("Continuing with pagination to URL: ", ri.URL) - continue - } else { - // Pagination control using HTTP Body fields - ri, err = createRequestInfoFromBody(in.config.Pagination, common.MapStr(response), common.MapStr(mm), ri) - if err != nil { - return err - } - if ri == nil { - break - } - if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { - return err - } - in.log.Info("Continuing with pagination to URL: ", ri.URL) - continue - } - } - break - } + return "80" + }() - if mm != nil && in.config.DateCursor.IsEnabled() { - in.advanceCursor(common.MapStr(mm)) + _, err := net.DialTimeout("tcp", net.JoinHostPort(in.config.URL.Hostname(), port), time.Second) + if err != nil { + return fmt.Errorf("url %q is unreachable", in.config.URL) } return nil } -func (in *HttpjsonInput) getURL() string { - if !in.config.DateCursor.IsEnabled() { - return in.config.URL - } +// Run starts the input and blocks until it ends the execution. +// It will return on context cancellation, any other error will be retried. +func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger.With("url", in.config.URL) - var dateStr string - if in.nextCursorValue == "" { - t := timeNow().UTC().Add(-in.config.DateCursor.InitialInterval) - dateStr = t.Format(in.config.DateCursor.GetDateFormat()) - } else { - dateStr = in.nextCursorValue - } + stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - url, err := url.Parse(in.config.URL) + httpClient, err := in.newHTTPClient(stdCtx) if err != nil { - return in.config.URL - } - - q := url.Query() - - var value string - if in.config.DateCursor.ValueTemplate == nil { - value = dateStr - } else { - buf := new(bytes.Buffer) - if err := in.config.DateCursor.ValueTemplate.Execute(buf, dateStr); err != nil { - return in.config.URL - } - value = buf.String() + return err } - q.Set(in.config.DateCursor.URLField, value) + dateCursor := newDateCursorFromConfig(in.config, log) - url.RawQuery = q.Encode() - - return url.String() -} + rateLimiter := newRateLimiterFromConfig(in.config, log) -func (in *HttpjsonInput) advanceCursor(m common.MapStr) { - if in.config.DateCursor.Field == "" { - in.nextCursorValue = time.Now().UTC().Format(in.config.DateCursor.GetDateFormat()) - return - } - - v, err := m.GetValue(in.config.DateCursor.Field) - if err != nil { - in.log.Warnf("date_cursor field: %q", err) - return - } - switch t := v.(type) { - case string: - _, err := time.Parse(in.config.DateCursor.GetDateFormat(), t) - if err != nil { - in.log.Warn("date_cursor field does not have the expected layout") - return - } - in.nextCursorValue = t - default: - in.log.Warn("date_cursor field must be a string, cursor will not advance") - return - } -} + pagination := newPaginationFromConfig(in.config) -func (in *HttpjsonInput) run() error { - ctx, cancel := context.WithCancel(in.workerCtx) - defer cancel() + requester := newRequester( + in.config, + rateLimiter, + dateCursor, + pagination, + httpClient, + log, + ) - client, err := in.newHTTPClient(ctx) - if err != nil { - return err + // TODO: disallow passing interval = 0 as a mean to run once. + if in.config.Interval == 0 { + return requester.processHTTPRequest(stdCtx, publisher) } - ri := &RequestInfo{ - ContentMap: common.MapStr{}, - Headers: in.HTTPHeaders, - } - if in.config.HTTPMethod == "POST" && in.config.HTTPRequestBody != nil { - ri.ContentMap.Update(common.MapStr(in.config.HTTPRequestBody)) - } - err = in.processHTTPRequest(ctx, client, ri) - if err == nil && in.Interval > 0 { - ticker := time.NewTicker(in.Interval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - in.log.Info("Context done.") - return nil - case <-ticker.C: - in.log.Info("Process another repeated request.") - err = in.processHTTPRequest(ctx, client, ri) - if err != nil { - return err - } - } + err = timed.Periodic(stdCtx, in.config.Interval, func() error { + log.Info("Process another repeated request.") + if err := requester.processHTTPRequest(stdCtx, publisher); err != nil { + log.Error(err) } - } - return err -} + return nil + }) -// Stop stops the misp input and waits for it to fully stop. -func (in *HttpjsonInput) Stop() { - in.workerCancel() - in.workerWg.Wait() -} + log.Infof("Context done: %v", err) -// Wait is an alias for Stop. -func (in *HttpjsonInput) Wait() { - in.Stop() + return nil } -func (in *HttpjsonInput) newHTTPClient(ctx context.Context) (*http.Client, error) { - tlsConfig, err := tlscommon.LoadTLSConfig(in.config.TLS) - if err != nil { - return nil, err - } - +func (in *httpJSONInput) newHTTPClient(ctx context.Context) (*http.Client, error) { // Make retryable HTTP client - var client *retryablehttp.Client = &retryablehttp.Client{ + client := &retryablehttp.Client{ HTTPClient: &http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: in.config.HTTPClientTimeout, }).DialContext, - TLSClientConfig: tlsConfig.ToConfig(), + TLSClientConfig: in.tlsConfig.ToConfig(), DisableKeepAlives: true, }, Timeout: in.config.HTTPClientTimeout, diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go new file mode 100644 index 00000000000..9a7bf82b2b4 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "regexp" + + "github.com/elastic/beats/v7/libbeat/common" +) + +type pagination struct { + extraBodyContent common.MapStr + header *Header + idField string + requestField string + urlField string + url string +} + +func newPaginationFromConfig(config config) *pagination { + if !config.Pagination.IsEnabled() { + return nil + } + return &pagination{ + extraBodyContent: config.Pagination.ExtraBodyContent.Clone(), + header: config.Pagination.Header, + idField: config.Pagination.IDField, + requestField: config.Pagination.RequestField, + urlField: config.Pagination.URLField, + url: config.Pagination.URL, + } +} + +func (p *pagination) nextRequestInfo(ri *requestInfo, response response, lastObj common.MapStr) (*requestInfo, bool, error) { + if p == nil { + return ri, false, nil + } + + if p.header == nil { + var err error + // Pagination control using HTTP Body fields + if err = p.setRequestInfoFromBody(response.body, lastObj, ri); err != nil { + // if the field is not found, there is no next page + if errors.Is(err, common.ErrKeyNotFound) { + return ri, false, nil + } + return ri, false, err + } + + return ri, true, nil + } + + // Pagination control using HTTP Header + url, err := getNextLinkFromHeader(response.header, p.header.FieldName, p.header.RegexPattern) + if err != nil { + return ri, false, fmt.Errorf("failed to retrieve the next URL for pagination: %w", err) + } + if ri.url == url || url == "" { + return ri, false, nil + } + + ri.url = url + + return ri, true, nil +} + +// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response +func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) { + links, ok := header[fieldName] + if !ok { + return "", fmt.Errorf("field %s does not exist in the HTTP Header", fieldName) + } + for _, link := range links { + matchArray := re.FindAllStringSubmatch(link, -1) + if len(matchArray) == 1 { + return matchArray[0][1], nil + } + } + return "", nil +} + +// createRequestInfoFromBody creates a new RequestInfo for a new HTTP request in pagination based on HTTP response body +func (p *pagination) setRequestInfoFromBody(response, last common.MapStr, ri *requestInfo) error { + // we try to get it from last element, if not found, from the original response + v, err := last.GetValue(p.idField) + if err == common.ErrKeyNotFound { + v, err = response.GetValue(p.idField) + } + + if err != nil { + return fmt.Errorf("failed to retrieve id_field for pagination: %w", err) + } + + if p.requestField != "" { + _, _ = ri.contentMap.Put(p.requestField, v) + if p.url != "" { + ri.url = p.url + } + } else if p.urlField != "" { + url, err := url.Parse(ri.url) + if err == nil { + q := url.Query() + q.Set(p.urlField, fmt.Sprint(v)) + url.RawQuery = q.Encode() + ri.url = url.String() + } + } else { + switch vt := v.(type) { + case string: + ri.url = vt + default: + return errors.New("pagination ID is not of string type") + } + } + if len(p.extraBodyContent) > 0 { + ri.contentMap.Update(common.MapStr(p.extraBodyContent)) + } + return nil +} diff --git a/x-pack/filebeat/input/httpjson/pagination_test.go b/x-pack/filebeat/input/httpjson/pagination_test.go new file mode 100644 index 00000000000..9b04de75819 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/pagination_test.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "net/http" + "regexp" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestGetNextLinkFromHeader(t *testing.T) { + header := make(http.Header) + header.Add("Link", "; rel=\"self\"") + header.Add("Link", "; rel=\"next\"") + re, _ := regexp.Compile("<([^>]+)>; *rel=\"next\"(?:,|$)") + url, err := getNextLinkFromHeader(header, "Link", re) + if url != "https://dev-168980.okta.com/api/v1/logs?after=1581658181086_1" { + t.Fatal("Failed to test getNextLinkFromHeader. URL " + url + " is not expected") + } + if err != nil { + t.Fatal("Failed to test getNextLinkFromHeader with error:", err) + } +} + +func TestCreateRequestInfoFromBody(t *testing.T) { + m := map[string]interface{}{ + "id": 100, + } + extraBodyContent := common.MapStr{"extra_body": "abc"} + pagination := &pagination{ + idField: "id", + requestField: "pagination_id", + extraBodyContent: extraBodyContent, + url: "https://test-123", + } + ri := &requestInfo{ + url: "", + contentMap: common.MapStr{}, + headers: common.MapStr{}, + } + err := pagination.setRequestInfoFromBody( + common.MapStr(m), + common.MapStr(m), + ri, + ) + if ri.url != "https://test-123" { + t.Fatal("Failed to test createRequestInfoFromBody. URL should be https://test-123.") + } + p, err := ri.contentMap.GetValue("pagination_id") + if err != nil { + t.Fatal("Failed to test createRequestInfoFromBody with error", err) + } + switch pt := p.(type) { + case int: + if pt != 100 { + t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %d should be 100.", pt) + } + default: + t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %T should be int.", pt) + } + b, err := ri.contentMap.GetValue("extra_body") + if err != nil { + t.Fatal("Failed to test createRequestInfoFromBody with error", err) + } + switch bt := b.(type) { + case string: + if bt != "abc" { + t.Fatalf("Failed to test createRequestInfoFromBody. extra_body value %s does not match \"abc\".", bt) + } + default: + t.Fatalf("Failed to test createRequestInfoFromBody. extra_body type %T should be string.", bt) + } +} diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go new file mode 100644 index 00000000000..57d206224ac --- /dev/null +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +type rateLimiter struct { + log *logp.Logger + + limit string + reset string + remaining string +} + +func newRateLimiterFromConfig(config config, log *logp.Logger) *rateLimiter { + if config.RateLimit == nil { + return nil + } + + return &rateLimiter{ + log: log, + limit: config.RateLimit.Limit, + reset: config.RateLimit.Reset, + remaining: config.RateLimit.Remaining, + } +} + +func (r *rateLimiter) execute(ctx context.Context, f func(context.Context) (*http.Response, error)) (*http.Response, error) { + for { + resp, err := f(ctx) + if err != nil { + return nil, err + } + + header := resp.Header + if err != nil { + return nil, fmt.Errorf("failed to read http.response.body: %w", err) + } + + if r == nil || resp.StatusCode == http.StatusOK { + return resp, nil + } + + if resp.StatusCode != http.StatusTooManyRequests { + return nil, fmt.Errorf("http request was unsuccessful with a status code %d", resp.StatusCode) + } + + if err := r.applyRateLimit(ctx, header); err != nil { + return nil, err + } + } +} + +// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response +func (r *rateLimiter) applyRateLimit(ctx context.Context, header http.Header) error { + epoch, err := r.getRateLimit(header) + if err != nil { + return err + } + + t := time.Unix(epoch, 0) + w := time.Until(t) + if epoch == 0 || w <= 0 { + r.log.Debugf("Rate Limit: No need to apply rate limit.") + return nil + } + r.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t) + ticker := time.NewTicker(w) + defer ticker.Stop() + + select { + case <-ctx.Done(): + r.log.Info("Context done.") + return nil + case <-ticker.C: + r.log.Debug("Rate Limit: time is up.") + return nil + } +} + +// getRateLimit gets the rate limit value if specified in the HTTP Header of the response, +// and returns an int64 value in seconds since unix epoch for rate limit reset time. +// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it +// returns 0 for the epoch value. +func (r *rateLimiter) getRateLimit(header http.Header) (int64, error) { + if r == nil { + return 0, nil + } + + if r.remaining == "" { + return 0, nil + } + + remaining := header.Get(r.remaining) + if remaining == "" { + return 0, fmt.Errorf("field %s does not exist in the HTTP Header, or is empty", r.remaining) + } + m, err := strconv.ParseInt(remaining, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err) + } + + if m != 0 { + return 0, nil + } + + reset := header.Get(r.reset) + if reset == "" { + return 0, fmt.Errorf("field %s does not exist in the HTTP Header, or is empty", r.reset) + } + epoch, err := strconv.ParseInt(reset, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err) + } + if time.Unix(epoch, 0).Sub(time.Now()) <= 0 { + return 0, nil + } + + return epoch, nil +} diff --git a/x-pack/filebeat/input/httpjson/rate_limiter_test.go b/x-pack/filebeat/input/httpjson/rate_limiter_test.go new file mode 100644 index 00000000000..e349e725f2f --- /dev/null +++ b/x-pack/filebeat/input/httpjson/rate_limiter_test.go @@ -0,0 +1,64 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "net/http" + "strconv" + "testing" + "time" +) + +// Test getRateLimit function with a remaining quota, expect to receive 0, nil. +func TestGetRateLimitCase1(t *testing.T) { + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "118") + header.Add("X-Rate-Limit-Reset", "1581658643") + rateLimit := &rateLimiter{ + limit: "X-Rate-Limit-Limit", + reset: "X-Rate-Limit-Reset", + remaining: "X-Rate-Limit-Remaining", + } + epoch, err := rateLimit.getRateLimit(header) + if err != nil || epoch != 0 { + t.Fatal("Failed to test getRateLimit.") + } +} + +// Test getRateLimit function with a past time, expect to receive 0, nil. +func TestGetRateLimitCase2(t *testing.T) { + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "10") + header.Add("X-Rate-Limit-Remaining", "0") + header.Add("X-Rate-Limit-Reset", "1581658643") + rateLimit := &rateLimiter{ + limit: "X-Rate-Limit-Limit", + reset: "X-Rate-Limit-Reset", + remaining: "X-Rate-Limit-Remaining", + } + epoch, err := rateLimit.getRateLimit(header) + if err != nil || epoch != 0 { + t.Fatal("Failed to test getRateLimit.") + } +} + +// Test getRateLimit function with a time yet to come, expect to receive , nil. +func TestGetRateLimitCase3(t *testing.T) { + epoch := time.Now().Unix() + 100 + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "10") + header.Add("X-Rate-Limit-Remaining", "0") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10)) + rateLimit := &rateLimiter{ + limit: "X-Rate-Limit-Limit", + reset: "X-Rate-Limit-Reset", + remaining: "X-Rate-Limit-Remaining", + } + epoch2, err := rateLimit.getRateLimit(header) + if err != nil || epoch2 != epoch { + t.Fatal("Failed to test getRateLimit.") + } +} diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go new file mode 100644 index 00000000000..579e5e26756 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -0,0 +1,264 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type requestInfo struct { + url string + contentMap common.MapStr + headers common.MapStr +} + +type requester struct { + log *logp.Logger + client *http.Client + dateCursor *dateCursor + rateLimiter *rateLimiter + pagination *pagination + + method string + reqBody common.MapStr + headers common.MapStr + noHTTPBody bool + apiKey string + authScheme string + jsonObjects string + splitEventsBy string +} + +func newRequester( + config config, + rateLimiter *rateLimiter, + dateCursor *dateCursor, + pagination *pagination, + client *http.Client, + log *logp.Logger) *requester { + return &requester{ + log: log, + client: client, + rateLimiter: rateLimiter, + dateCursor: dateCursor, + pagination: pagination, + method: config.HTTPMethod, + reqBody: config.HTTPRequestBody.Clone(), + headers: config.HTTPHeaders.Clone(), + noHTTPBody: config.NoHTTPBody, + apiKey: config.APIKey, + authScheme: config.AuthenticationScheme, + splitEventsBy: config.SplitEventsBy, + jsonObjects: config.JSONObjects, + } +} + +type response struct { + header http.Header + body common.MapStr +} + +// processHTTPRequest processes HTTP request, and handles pagination if enabled +func (r *requester) processHTTPRequest(ctx context.Context, publisher stateless.Publisher) error { + ri := &requestInfo{ + url: r.dateCursor.getURL(), + contentMap: common.MapStr{}, + headers: r.headers, + } + + if r.method == "POST" && r.reqBody != nil { + ri.contentMap.Update(common.MapStr(r.reqBody)) + } + + var ( + m, v interface{} + response response + lastObj common.MapStr + ) + + // always request at least once + hasNext := true + + for hasNext { + resp, err := r.rateLimiter.execute( + ctx, + func(ctx context.Context) (*http.Response, error) { + req, err := r.createHTTPRequest(ctx, ri) + if err != nil { + return nil, fmt.Errorf("failed to create http request: %w", err) + } + msg, err := r.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute http client.Do: %w", err) + } + return msg, nil + }, + ) + if err != nil { + return err + } + + responseData, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read http response: %w", err) + } + _ = resp.Body.Close() + + if err = json.Unmarshal(responseData, &m); err != nil { + r.log.Debug("failed to unmarshal http.response.body", string(responseData)) + return fmt.Errorf("failed to unmarshal http.response.body: %w", err) + } + + switch obj := m.(type) { + // Top level Array + case []interface{}: + lastObj, err = r.processEventArray(publisher, obj) + if err != nil { + return err + } + case map[string]interface{}: + response.body = obj + if r.jsonObjects == "" { + lastObj, err = r.processEventArray(publisher, []interface{}{obj}) + if err != nil { + return err + } + } else { + v, err = common.MapStr(obj).GetValue(r.jsonObjects) + if err != nil { + if err == common.ErrKeyNotFound { + break + } + return err + } + switch ts := v.(type) { + case []interface{}: + lastObj, err = r.processEventArray(publisher, ts) + if err != nil { + return err + } + default: + return fmt.Errorf("content of %s is not a valid array", r.jsonObjects) + } + } + default: + r.log.Debug("http.response.body is not a valid JSON object", string(responseData)) + return fmt.Errorf("http.response.body is not a valid JSON object, but a %T", obj) + } + + ri, hasNext, err = r.pagination.nextRequestInfo(ri, response, lastObj) + if err != nil { + return err + } + } + + if lastObj != nil && r.dateCursor.enabled { + r.dateCursor.advance(common.MapStr(lastObj)) + } + + return nil +} + +// createHTTPRequest creates an HTTP/HTTPs request for the input +func (r *requester) createHTTPRequest(ctx context.Context, ri *requestInfo) (*http.Request, error) { + var body io.Reader + if len(ri.contentMap) == 0 || r.noHTTPBody { + body = nil + } else { + b, err := json.Marshal(ri.contentMap) + if err != nil { + return nil, err + } + body = bytes.NewReader(b) + } + req, err := http.NewRequest(r.method, ri.url, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", userAgent) + if r.apiKey != "" { + if r.authScheme != "" { + req.Header.Set("Authorization", r.authScheme+" "+r.apiKey) + } else { + req.Header.Set("Authorization", r.apiKey) + } + } + for k, v := range ri.headers { + switch vv := v.(type) { + case string: + req.Header.Set(k, vv) + default: + } + } + return req, nil +} + +// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. +func (r *requester) processEventArray(publisher stateless.Publisher, events []interface{}) (map[string]interface{}, error) { + var last map[string]interface{} + for _, t := range events { + switch v := t.(type) { + case map[string]interface{}: + for _, e := range r.splitEvent(v) { + last = e + d, err := json.Marshal(e) + if err != nil { + return nil, fmt.Errorf("failed to marshal %+v: %w", e, err) + } + publisher.Publish(makeEvent(string(d))) + } + default: + return nil, fmt.Errorf("expected only JSON objects in the array but got a %T", v) + } + } + return last, nil +} + +func (r *requester) splitEvent(event map[string]interface{}) []map[string]interface{} { + m := common.MapStr(event) + + hasSplitKey, _ := m.HasKey(r.splitEventsBy) + if r.splitEventsBy == "" || !hasSplitKey { + return []map[string]interface{}{event} + } + + splitOnIfc, _ := m.GetValue(r.splitEventsBy) + splitOn, ok := splitOnIfc.([]interface{}) + // if not an array or is empty, we do nothing + if !ok || len(splitOn) == 0 { + return []map[string]interface{}{event} + } + + var events []map[string]interface{} + for _, split := range splitOn { + s, ok := split.(map[string]interface{}) + // if not an object, we do nothing + if !ok { + return []map[string]interface{}{event} + } + + mm := m.Clone() + if _, err := mm.Put(r.splitEventsBy, s); err != nil { + return []map[string]interface{}{event} + } + + events = append(events, mm) + } + + return events +}