Skip to content

Commit

Permalink
[Filebeat][httpjson] Convert httpjson input to a v2 input (elastic#20226
Browse files Browse the repository at this point in the history
)

* Convert httpjson input to a v2 cursor input

* Add CHANGELOG entry

* Fix format errors

* Convert to stateless input and refactor:
- Paginator takes care of requesting next page info
- Rate limiter takes care of rate limiting requests
- Date cursor takes care of keeping track of cursor state

* Remove python tests

* Do not fail if there is no next page

* Refactor go integration tests to work with v2 input

* Do suggested changes to input and tests

* Update time.Periodic call with error return

* Change test duration values

* Sepparate sync test case

* Create custon url config type

* Change input.Run comment

* Change input.Run to only return on context cancellation

* Remove all usages of pkg/errors
  • Loading branch information
marc-gr authored Aug 20, 2020
1 parent 38fc1ed commit 929e838
Show file tree
Hide file tree
Showing 14 changed files with 1,263 additions and 1,255 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add event.ingested to all Filebeat modules. {pull}20386[20386]
- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450]
- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455]

- Convert httpjson to v2 input {pull}20226[20226]

*Heartbeat*

Expand Down
1 change: 0 additions & 1 deletion x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
)

Expand All @@ -26,6 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
return []v2.Plugin{
cloudfoundry.Plugin(),
http_endpoint.Plugin(),
httpjson.Plugin(),
o365audit.Plugin(log, store),
}
}
32 changes: 24 additions & 8 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package httpjson

import (
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"text/template"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)
Expand All @@ -35,7 +36,7 @@ type config struct {
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
URL *URL `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
}

Expand Down Expand Up @@ -92,6 +93,21 @@ func (t *Template) Unpack(in string) error {
return nil
}

type URL struct {
*url.URL
}

func (u *URL) Unpack(in string) error {
parsed, err := url.Parse(in)
if err != nil {
return err
}

*u = URL{URL: parsed}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
Expand Down Expand Up @@ -121,26 +137,26 @@ func (c *config) Validate() error {
case "GET", "POST":
break
default:
return errors.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod)
return fmt.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod)
}
if c.NoHTTPBody {
if len(c.HTTPRequestBody) > 0 {
return errors.Errorf("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously")
return errors.New("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously")
}
if c.Pagination != nil && (len(c.Pagination.ExtraBodyContent) > 0 || c.Pagination.RequestField != "") {
return errors.Errorf("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously")
return errors.New("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously")
}
}
if c.Pagination != nil {
if c.Pagination.Header != nil {
if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 {
return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
return errors.New("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
}
}
}
if c.OAuth2.IsEnabled() {
if c.APIKey != "" || c.AuthenticationScheme != "" {
return errors.Errorf("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/config_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package httpjson
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"

"github.com/pkg/errors"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
"golang.org/x/oauth2/endpoints"
Expand Down
13 changes: 12 additions & 1 deletion x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package httpjson

import (
"context"
"errors"
"os"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -110,6 +111,16 @@ func TestConfigValidationCase7(t *testing.T) {
}
}

func TestConfigMustFailWithInvalidURL(t *testing.T) {
m := map[string]interface{}{
"url": "::invalid::",
}
cfg := common.MustNewConfigFrom(m)
conf := defaultConfig()
err := cfg.Unpack(&conf)
assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`)
}

func TestConfigOauth2Validation(t *testing.T) {
cases := []struct {
name string
Expand Down
105 changes: 105 additions & 0 deletions x-pack/filebeat/input/httpjson/date_cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"bytes"
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type dateCursor struct {
log *logp.Logger
enabled bool
field string
url url.URL
urlField string
initialInterval time.Duration
dateFormat string

value string
valueTpl *Template
}

func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor {
c := &dateCursor{
enabled: config.DateCursor.IsEnabled(),
url: *config.URL.URL,
}

if !c.enabled {
return c
}

c.log = log
c.field = config.DateCursor.Field
c.urlField = config.DateCursor.URLField
c.initialInterval = config.DateCursor.InitialInterval
c.dateFormat = config.DateCursor.GetDateFormat()
c.valueTpl = config.DateCursor.ValueTemplate

return c
}

func (c *dateCursor) getURL() string {
if !c.enabled {
return c.url.String()
}

var dateStr string
if c.value == "" {
t := timeNow().UTC().Add(-c.initialInterval)
dateStr = t.Format(c.dateFormat)
} else {
dateStr = c.value
}

q := c.url.Query()

var value string
if c.valueTpl == nil {
value = dateStr
} else {
buf := new(bytes.Buffer)
if err := c.valueTpl.Template.Execute(buf, dateStr); err != nil {
return c.url.String()
}
value = buf.String()
}

q.Set(c.urlField, value)

c.url.RawQuery = q.Encode()

return c.url.String()
}

func (c *dateCursor) advance(m common.MapStr) {
if c.field == "" {
c.value = time.Now().UTC().Format(c.dateFormat)
return
}

v, err := m.GetValue(c.field)
if err != nil {
c.log.Warnf("date_cursor field: %q", err)
return
}
switch t := v.(type) {
case string:
_, err := time.Parse(c.dateFormat, t)
if err != nil {
c.log.Warn("date_cursor field does not have the expected layout")
return
}
c.value = t
default:
c.log.Warn("date_cursor field must be a string, cursor will not advance")
return
}
}
Loading

0 comments on commit 929e838

Please sign in to comment.