Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Httpjson v2 changes #23069

Merged
merged 1 commit into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ Valid when used with `type: map`. When not empty, defines a new field where the

Nested split operation. Split operations can be nested at will. An event won't be created until the deepest split operation is applied.

[float]
==== `response.request_body_on_pagination`

If set to true, the values in `request.body` are sent for pagination requests. Default: `false`.

[[response-pagination]]
[float]
==== `response.pagination`
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/httpjson/internal/v2/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

type responseConfig struct {
Transforms transformsConfig `config:"transforms"`
Pagination transformsConfig `config:"pagination"`
Split *splitConfig `config:"split"`
RequestBodyOnPagination bool `config:"request_body_on_pagination"`
Transforms transformsConfig `config:"transforms"`
Pagination transformsConfig `config:"pagination"`
Split *splitConfig `config:"split"`
}

type splitConfig struct {
Expand Down
8 changes: 7 additions & 1 deletion x-pack/filebeat/input/httpjson/internal/v2/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestInput(t *testing.T) {
},
},
handler: defaultHandler("GET", ""),
expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`},
expected: []string{},
},
{
name: "Test date cursor",
Expand Down Expand Up @@ -309,6 +309,12 @@ func TestInput(t *testing.T) {
timeout := time.NewTimer(5 * time.Second)
t.Cleanup(func() { _ = timeout.Stop() })

if len(tc.expected) == 0 {
cancel()
assert.NoError(t, g.Wait())
return
}

var receivedCount int
wait:
for {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,18 @@ func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pag

rts, _ := newBasicTransformsFromConfig(config.Request.Transforms, requestNamespace, log)
pts, _ := newBasicTransformsFromConfig(config.Response.Pagination, paginationNamespace, log)

body := func() *common.MapStr {
if config.Response.RequestBodyOnPagination {
return config.Request.Body
}
return &common.MapStr{}
}()

requestFactory := newPaginationRequestFactory(
config.Request.Method,
*config.Request.URL.URL,
body,
append(rts, pts...),
config.Auth,
log,
Expand All @@ -48,12 +57,12 @@ func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pag
return pagination
}

func newPaginationRequestFactory(method string, url url.URL, ts []basicTransform, authConfig *authConfig, log *logp.Logger) *requestFactory {
func newPaginationRequestFactory(method string, url url.URL, body *common.MapStr, ts []basicTransform, authConfig *authConfig, log *logp.Logger) *requestFactory {
// config validation already checked for errors here
rf := &requestFactory{
url: url,
method: method,
body: &common.MapStr{},
body: body,
transforms: ts,
log: log,
}
Expand Down Expand Up @@ -143,6 +152,7 @@ func (iter *pageIterator) getPage() (*response, error) {
return nil, err
}
iter.resp.Body.Close()
iter.n += 1

var r response
r.header = iter.resp.Header
Expand All @@ -155,7 +165,5 @@ func (iter *pageIterator) getPage() (*response, error) {
}
}

iter.n += 1

return &r, nil
}
24 changes: 18 additions & 6 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var (
errEmptyField = errors.New("the requested field is empty")
errEmptyRootField = errors.New("the requested root field is empty")
errExpectedSplitArr = errors.New("split was expecting field to be an array")
errExpectedSplitObj = errors.New("split was expecting field to be an object")
)
Expand All @@ -26,6 +27,7 @@ type split struct {
child *split
keepParent bool
keyField string
isRoot bool
}

func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
Expand All @@ -37,11 +39,8 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if err != nil {
return nil, err
}

if split.targetInfo.Type != targetBody {
return nil, fmt.Errorf("invalid target type: %s", split.targetInfo.Type)
}

// we want to be able to identify which split is the root of the chain
split.isRoot = true
return split, nil
}

Expand All @@ -51,14 +50,18 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
return nil, err
}

if ti.Type != targetBody {
return nil, fmt.Errorf("invalid target type: %s", ti.Type)
}

ts, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, log)
if err != nil {
return nil, err
}

var s *split
if c.Split != nil {
s, err = newSplitResponse(c.Split, log)
s, err = newSplit(c.Split, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,6 +90,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if v == nil {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand All @@ -99,6 +105,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(varr) == 0 {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand All @@ -117,6 +126,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vmap) == 0 {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand Down
10 changes: 4 additions & 6 deletions x-pack/filebeat/input/httpjson/internal/v2/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ func TestSplit(t *testing.T) {
},
},
{
name: "First level split skips publish if no events and keep_parent: false",
name: "First level split skips publish if no events",
config: &splitConfig{
Target: "body.response",
Type: "array",
Split: &splitConfig{
Target: "body.Event.Attributes",
KeepParent: false,
KeepParent: true,
},
},
ctx: emptyTransformContext(),
Expand All @@ -291,10 +291,8 @@ func TestSplit(t *testing.T) {
"response": []interface{}{},
},
},
expectedMessages: []common.MapStr{
{"response": []interface{}{}},
},
expectedErr: errEmptyField,
expectedMessages: []common.MapStr{},
expectedErr: errEmptyRootField,
},
{
name: "Changes must be local to parent when nested splits",
Expand Down
20 changes: 10 additions & 10 deletions x-pack/filebeat/input/httpjson/internal/v2/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func (tr transformable) setHeader(v http.Header) {
func (tr transformable) header() http.Header {
val, err := tr.GetValue("header")
if err != nil {
return http.Header{}
// if it does not exist, initialize it
header := http.Header{}
tr.setHeader(header)
return header
}

header, ok := val.(http.Header)
if !ok {
return http.Header{}
}
header, _ := val.(http.Header)

return header
}
Expand All @@ -123,13 +123,13 @@ func (tr transformable) setBody(v common.MapStr) {
func (tr transformable) body() common.MapStr {
val, err := tr.GetValue("body")
if err != nil {
return common.MapStr{}
// if it does not exist, initialize it
body := common.MapStr{}
tr.setBody(body)
return body
}

body, ok := val.(common.MapStr)
if !ok {
return common.MapStr{}
}
body, _ := val.(common.MapStr)

return body
}
Expand Down