Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][httpjson] Convert httpjson input to a v2 input #20226

Merged
merged 15 commits into from
Aug 20, 2020
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add event.ingested to all Filebeat modules. {pull}20386[20386]
- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450]
- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455]

- Convert httpjson to v2 input {pull}20226[20226]

*Heartbeat*

Expand Down
1 change: 0 additions & 1 deletion x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
)

Expand All @@ -26,6 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
return []v2.Plugin{
cloudfoundry.Plugin(),
http_endpoint.Plugin(),
httpjson.Plugin(),
o365audit.Plugin(log, store),
}
}
32 changes: 24 additions & 8 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package httpjson

import (
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"text/template"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)
Expand All @@ -35,7 +36,7 @@ type config struct {
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
URL *URL `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
}

Expand Down Expand Up @@ -92,6 +93,21 @@ func (t *Template) Unpack(in string) error {
return nil
}

type URL struct {
*url.URL
}

func (u *URL) Unpack(in string) error {
parsed, err := url.Parse(in)
if err != nil {
return err
}

*u = URL{URL: parsed}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
Expand Down Expand Up @@ -121,26 +137,26 @@ func (c *config) Validate() error {
case "GET", "POST":
break
default:
return errors.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod)
return fmt.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod)
}
if c.NoHTTPBody {
if len(c.HTTPRequestBody) > 0 {
return errors.Errorf("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously")
return errors.New("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously")
}
if c.Pagination != nil && (len(c.Pagination.ExtraBodyContent) > 0 || c.Pagination.RequestField != "") {
return errors.Errorf("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously")
return errors.New("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously")
}
}
if c.Pagination != nil {
if c.Pagination.Header != nil {
if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 {
return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
return errors.New("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
}
}
}
if c.OAuth2.IsEnabled() {
if c.APIKey != "" || c.AuthenticationScheme != "" {
return errors.Errorf("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously")
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/config_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package httpjson
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"

"github.com/pkg/errors"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
"golang.org/x/oauth2/endpoints"
Expand Down
13 changes: 12 additions & 1 deletion x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package httpjson

import (
"context"
"errors"
"os"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -110,6 +111,16 @@ func TestConfigValidationCase7(t *testing.T) {
}
}

func TestConfigMustFailWithInvalidURL(t *testing.T) {
m := map[string]interface{}{
"url": "::invalid::",
}
cfg := common.MustNewConfigFrom(m)
conf := defaultConfig()
err := cfg.Unpack(&conf)
assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`)
}

func TestConfigOauth2Validation(t *testing.T) {
cases := []struct {
name string
Expand Down
105 changes: 105 additions & 0 deletions x-pack/filebeat/input/httpjson/date_cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"bytes"
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type dateCursor struct {
log *logp.Logger
enabled bool
field string
url url.URL
urlField string
initialInterval time.Duration
dateFormat string

value string
valueTpl *Template
}

func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor {
c := &dateCursor{
enabled: config.DateCursor.IsEnabled(),
url: *config.URL.URL,
}

if !c.enabled {
return c
}

c.log = log
c.field = config.DateCursor.Field
c.urlField = config.DateCursor.URLField
c.initialInterval = config.DateCursor.InitialInterval
c.dateFormat = config.DateCursor.GetDateFormat()
c.valueTpl = config.DateCursor.ValueTemplate

return c
}

func (c *dateCursor) getURL() string {
if !c.enabled {
return c.url.String()
}

var dateStr string
if c.value == "" {
t := timeNow().UTC().Add(-c.initialInterval)
dateStr = t.Format(c.dateFormat)
} else {
dateStr = c.value
}

q := c.url.Query()

var value string
if c.valueTpl == nil {
value = dateStr
} else {
buf := new(bytes.Buffer)
if err := c.valueTpl.Template.Execute(buf, dateStr); err != nil {
return c.url.String()
}
value = buf.String()
}

q.Set(c.urlField, value)

c.url.RawQuery = q.Encode()

return c.url.String()
}

func (c *dateCursor) advance(m common.MapStr) {
if c.field == "" {
c.value = time.Now().UTC().Format(c.dateFormat)
return
}

v, err := m.GetValue(c.field)
if err != nil {
c.log.Warnf("date_cursor field: %q", err)
return
}
switch t := v.(type) {
case string:
_, err := time.Parse(c.dateFormat, t)
if err != nil {
c.log.Warn("date_cursor field does not have the expected layout")
return
}
c.value = t
default:
c.log.Warn("date_cursor field must be a string, cursor will not advance")
return
}
}
Loading