Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson] - Added bug-fix for duplicate data issue #33213 #33664

Merged
merged 7 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,169 @@ func TestInput(t *testing.T) {
`{"space":{"world":"moon"}}`,
},
},
{
name: "Test if cursor value is updated for root response with chaining & pagination",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "createdAt":"22/02/2022",
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213", "createdAt":"24/04/2022"}`)
case "/2212/1":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.createdAt",
"value": "[[ .cursor.last_published_login ]]",
},
},
},
},
},
},
"cursor": map[string]interface{}{
"last_published_login": map[string]interface{}{
"value": "[[ .last_event.createdAt ]]",
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
{
name: "Test if cursor value is updated for root response with chaining & pagination along with split operator",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212","time":[{"timeStamp":"22/02/2022"}],
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213","time":[{"timeStamp":"24/04/2022"}]}`)
case "/2212/1":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"response.split": map[string]interface{}{
"target": "body.time",
"type": "array",
"keep_parent": true,
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.createdAt",
"value": "[[ .cursor.last_published_login ]]",
},
},
},
},
},
},
"cursor": map[string]interface{}{
"last_published_login": map[string]interface{}{
"value": "[[ .last_event.time.timeStamp ]]",
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"errors"
"io/ioutil"
"io"
"net/http"
"net/url"

Expand Down Expand Up @@ -159,7 +159,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
}

func (iter *pageIterator) getPage() (*response, error) {
bodyBytes, err := ioutil.ReadAll(iter.resp.Body)
bodyBytes, err := io.ReadAll(iter.resp.Body)
if err != nil {
return nil, err
}
Expand Down
73 changes: 43 additions & 30 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,17 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {

func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
var (
n int
ids []string
err error
urlCopy url.URL
urlString string
httpResp *http.Response
initialResponse []*http.Response
intermediateResps []*http.Response
finalResps []*http.Response
isChainExpected bool
chainIndex int
n int
ids []string
err error
urlCopy url.URL
urlString string
httpResp *http.Response
initialResponse []*http.Response
intermediateResps []*http.Response
finalResps []*http.Response
isChainWithPageExpected bool
chainIndex int
)

for i, rf := range r.requestFactories {
Expand Down Expand Up @@ -319,38 +319,47 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
// since, initially the first response and last response are the same
trCtx.updateLastResponse(firstResponse)

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
continue
}

// if flow of control reaches here, that means there are more than 1 request factories
// if a pagination request factory at the root level and a chain step exists, only then we will initialize flags & variables
// which are required for chaining with pagination
if r.requestFactories[i+1].isChain && r.responseProcessors[i].pagination.requestFactory != nil {
isChainExpected = true
// if a chain step exists, only then we will initialize flags & variables here which are required for chaining
if r.requestFactories[i+1].isChain {
chainIndex = i + 1
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
initialResponse = append(initialResponse, resp)
// the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will
// be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response,
// first_event & last_event cursor values.
finalResps = append(finalResps, resp)

// if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here
// which are required for chaining with root level pagination
if r.responseProcessors[i].pagination.requestFactory != nil {
isChainWithPageExpected = true
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
initialResponse = append(initialResponse, resp)
}
}

intermediateResps = append(intermediateResps, httpResp)
ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
if err != nil {
return err
}
// we will only processAndPublishEvents here if chains & root level pagination do not exist, inorder to avoid unnecessary pagination
if !isChainExpected {
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
} else {
if len(ids) == 0 {
n = 0
Expand Down Expand Up @@ -420,17 +429,17 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

var events <-chan maybeMsg
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps)
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
}
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}
}

defer httpResp.Body.Close()

if isChainExpected {
// if pagination exists for the parent request along with chaining, then for each page response the chain is processed
if isChainWithPageExpected {
n += r.processRemainingChainEvents(stdCtx, trCtx, publisher, initialResponse, chainIndex)
}
r.log.Infof("request finished: %d events published", n)
Expand Down Expand Up @@ -522,7 +531,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp)
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)

var n int
var eventCount int
Expand All @@ -544,6 +553,10 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}
response.Body = io.NopCloser(body)

// updates the cursor for pagination last_event & last_response when chaining is present
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()

// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
Expand Down Expand Up @@ -650,7 +663,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}

Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, log *
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response) <-chan maybeMsg {
func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg {
trCtx.clearIntervalData()

ch := make(chan maybeMsg)
Expand Down Expand Up @@ -158,6 +158,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
return
}

// last_response context object is updated here organically
trCtx.updateLastResponse(*page)

rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
Expand Down Expand Up @@ -192,6 +193,9 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
}
}
}
if !paginate {
break
}
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (ctx *transformContext) updateFirstResponse(r response) {

func (ctx *transformContext) clearIntervalData() {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lastEvent = &mapstr.M{}
ctx.firstEvent = &mapstr.M{}
ctx.lastResponse = &response{}
ctx.lock.Unlock()
}

type transformable mapstr.M
Expand Down