Skip to content

Commit

Permalink
Httpjson v2 changes: (elastic#23069)
Browse files Browse the repository at this point in the history
- Initialize header and body on transform if not set
- Avoid first page being '0'
- Do not send empty events if root split has no values
- Add option to reuse request body for pagination requests
  • Loading branch information
marc-gr authored Dec 14, 2020
1 parent 523d119 commit 9db963e
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 30 deletions.
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ Valid when used with `type: map`. When not empty, defines a new field where the

Nested split operation. Split operations can be nested at will. An event won't be created until the deepest split operation is applied.

[float]
==== `response.request_body_on_pagination`

If set to true, the values in `request.body` are sent for pagination requests. Default: `false`.

[[response-pagination]]
[float]
==== `response.pagination`
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/httpjson/internal/v2/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

type responseConfig struct {
Transforms transformsConfig `config:"transforms"`
Pagination transformsConfig `config:"pagination"`
Split *splitConfig `config:"split"`
RequestBodyOnPagination bool `config:"request_body_on_pagination"`
Transforms transformsConfig `config:"transforms"`
Pagination transformsConfig `config:"pagination"`
Split *splitConfig `config:"split"`
}

type splitConfig struct {
Expand Down
8 changes: 7 additions & 1 deletion x-pack/filebeat/input/httpjson/internal/v2/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestInput(t *testing.T) {
},
},
handler: defaultHandler("GET", ""),
expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`},
expected: []string{},
},
{
name: "Test date cursor",
Expand Down Expand Up @@ -309,6 +309,12 @@ func TestInput(t *testing.T) {
timeout := time.NewTimer(5 * time.Second)
t.Cleanup(func() { _ = timeout.Stop() })

if len(tc.expected) == 0 {
cancel()
assert.NoError(t, g.Wait())
return
}

var receivedCount int
wait:
for {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,18 @@ func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pag

rts, _ := newBasicTransformsFromConfig(config.Request.Transforms, requestNamespace, log)
pts, _ := newBasicTransformsFromConfig(config.Response.Pagination, paginationNamespace, log)

body := func() *common.MapStr {
if config.Response.RequestBodyOnPagination {
return config.Request.Body
}
return &common.MapStr{}
}()

requestFactory := newPaginationRequestFactory(
config.Request.Method,
*config.Request.URL.URL,
body,
append(rts, pts...),
config.Auth,
log,
Expand All @@ -48,12 +57,12 @@ func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pag
return pagination
}

func newPaginationRequestFactory(method string, url url.URL, ts []basicTransform, authConfig *authConfig, log *logp.Logger) *requestFactory {
func newPaginationRequestFactory(method string, url url.URL, body *common.MapStr, ts []basicTransform, authConfig *authConfig, log *logp.Logger) *requestFactory {
// config validation already checked for errors here
rf := &requestFactory{
url: url,
method: method,
body: &common.MapStr{},
body: body,
transforms: ts,
log: log,
}
Expand Down Expand Up @@ -143,6 +152,7 @@ func (iter *pageIterator) getPage() (*response, error) {
return nil, err
}
iter.resp.Body.Close()
iter.n += 1

var r response
r.header = iter.resp.Header
Expand All @@ -155,7 +165,5 @@ func (iter *pageIterator) getPage() (*response, error) {
}
}

iter.n += 1

return &r, nil
}
24 changes: 18 additions & 6 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

var (
errEmptyField = errors.New("the requested field is empty")
errEmptyRootField = errors.New("the requested root field is empty")
errExpectedSplitArr = errors.New("split was expecting field to be an array")
errExpectedSplitObj = errors.New("split was expecting field to be an object")
)
Expand All @@ -26,6 +27,7 @@ type split struct {
child *split
keepParent bool
keyField string
isRoot bool
}

func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
Expand All @@ -37,11 +39,8 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if err != nil {
return nil, err
}

if split.targetInfo.Type != targetBody {
return nil, fmt.Errorf("invalid target type: %s", split.targetInfo.Type)
}

// we want to be able to identify which split is the root of the chain
split.isRoot = true
return split, nil
}

Expand All @@ -51,14 +50,18 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
return nil, err
}

if ti.Type != targetBody {
return nil, fmt.Errorf("invalid target type: %s", ti.Type)
}

ts, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, log)
if err != nil {
return nil, err
}

var s *split
if c.Split != nil {
s, err = newSplitResponse(c.Split, log)
s, err = newSplit(c.Split, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,6 +90,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if v == nil {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand All @@ -99,6 +105,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(varr) == 0 {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand All @@ -117,6 +126,9 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vmap) == 0 {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
Expand Down
10 changes: 4 additions & 6 deletions x-pack/filebeat/input/httpjson/internal/v2/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ func TestSplit(t *testing.T) {
},
},
{
name: "First level split skips publish if no events and keep_parent: false",
name: "First level split skips publish if no events",
config: &splitConfig{
Target: "body.response",
Type: "array",
Split: &splitConfig{
Target: "body.Event.Attributes",
KeepParent: false,
KeepParent: true,
},
},
ctx: emptyTransformContext(),
Expand All @@ -291,10 +291,8 @@ func TestSplit(t *testing.T) {
"response": []interface{}{},
},
},
expectedMessages: []common.MapStr{
{"response": []interface{}{}},
},
expectedErr: errEmptyField,
expectedMessages: []common.MapStr{},
expectedErr: errEmptyRootField,
},
{
name: "Changes must be local to parent when nested splits",
Expand Down
20 changes: 10 additions & 10 deletions x-pack/filebeat/input/httpjson/internal/v2/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func (tr transformable) setHeader(v http.Header) {
func (tr transformable) header() http.Header {
val, err := tr.GetValue("header")
if err != nil {
return http.Header{}
// if it does not exist, initialize it
header := http.Header{}
tr.setHeader(header)
return header
}

header, ok := val.(http.Header)
if !ok {
return http.Header{}
}
header, _ := val.(http.Header)

return header
}
Expand All @@ -123,13 +123,13 @@ func (tr transformable) setBody(v common.MapStr) {
func (tr transformable) body() common.MapStr {
val, err := tr.GetValue("body")
if err != nil {
return common.MapStr{}
// if it does not exist, initialize it
body := common.MapStr{}
tr.setBody(body)
return body
}

body, ok := val.(common.MapStr)
if !ok {
return common.MapStr{}
}
body, _ := val.(common.MapStr)

return body
}
Expand Down

0 comments on commit 9db963e

Please sign in to comment.