Skip to content

Commit

Permalink
[filebeat][httpjson] - Added bug-fix for duplicate data issue #33213 (#…
Browse files Browse the repository at this point in the history
…33664)

* added bugfix for duplicate data issue #33213

* updated with PR suggetions

* updated comments

* re-engineered bugfix to update cursors properly

* spelling fix
  • Loading branch information
ShourieG authored Nov 17, 2022
1 parent 7be50c0 commit afb2beb
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 34 deletions.
163 changes: 163 additions & 0 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,169 @@ func TestInput(t *testing.T) {
`{"space":{"world":"moon"}}`,
},
},
{
name: "Test if cursor value is updated for root response with chaining & pagination",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "createdAt":"22/02/2022",
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213", "createdAt":"24/04/2022"}`)
case "/2212/1":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.createdAt",
"value": "[[ .cursor.last_published_login ]]",
},
},
},
},
},
},
"cursor": map[string]interface{}{
"last_published_login": map[string]interface{}{
"value": "[[ .last_event.createdAt ]]",
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
{
name: "Test if cursor value is updated for root response with chaining & pagination along with split operator",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212","time":[{"timeStamp":"22/02/2022"}],
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213","time":[{"timeStamp":"24/04/2022"}]}`)
case "/2212/1":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"response.split": map[string]interface{}{
"target": "body.time",
"type": "array",
"keep_parent": true,
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.createdAt",
"value": "[[ .cursor.last_published_login ]]",
},
},
},
},
},
},
"cursor": map[string]interface{}{
"last_published_login": map[string]interface{}{
"value": "[[ .last_event.time.timeStamp ]]",
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"errors"
"io/ioutil"
"io"
"net/http"
"net/url"

Expand Down Expand Up @@ -159,7 +159,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
}

func (iter *pageIterator) getPage() (*response, error) {
bodyBytes, err := ioutil.ReadAll(iter.resp.Body)
bodyBytes, err := io.ReadAll(iter.resp.Body)
if err != nil {
return nil, err
}
Expand Down
73 changes: 43 additions & 30 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,17 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {

func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
var (
n int
ids []string
err error
urlCopy url.URL
urlString string
httpResp *http.Response
initialResponse []*http.Response
intermediateResps []*http.Response
finalResps []*http.Response
isChainExpected bool
chainIndex int
n int
ids []string
err error
urlCopy url.URL
urlString string
httpResp *http.Response
initialResponse []*http.Response
intermediateResps []*http.Response
finalResps []*http.Response
isChainWithPageExpected bool
chainIndex int
)

for i, rf := range r.requestFactories {
Expand Down Expand Up @@ -319,38 +319,47 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
// since, initially the first response and last response are the same
trCtx.updateLastResponse(firstResponse)

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
continue
}

// if flow of control reaches here, that means there are more than 1 request factories
// if a pagination request factory at the root level and a chain step exists, only then we will initialize flags & variables
// which are required for chaining with pagination
if r.requestFactories[i+1].isChain && r.responseProcessors[i].pagination.requestFactory != nil {
isChainExpected = true
// if a chain step exists, only then we will initialize flags & variables here which are required for chaining
if r.requestFactories[i+1].isChain {
chainIndex = i + 1
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
initialResponse = append(initialResponse, resp)
// the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will
// be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response,
// first_event & last_event cursor values.
finalResps = append(finalResps, resp)

// if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here
// which are required for chaining with root level pagination
if r.responseProcessors[i].pagination.requestFactory != nil {
isChainWithPageExpected = true
resp, err := cloneResponse(httpResp)
if err != nil {
return err
}
initialResponse = append(initialResponse, resp)
}
}

intermediateResps = append(intermediateResps, httpResp)
ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace)
if err != nil {
return err
}
// we will only processAndPublishEvents here if chains & root level pagination do not exist, inorder to avoid unnecessary pagination
if !isChainExpected {
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
} else {
if len(ids) == 0 {
n = 0
Expand Down Expand Up @@ -420,17 +429,17 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

var events <-chan maybeMsg
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps)
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
}
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}
}

defer httpResp.Body.Close()

if isChainExpected {
// if pagination exists for the parent request along with chaining, then for each page response the chain is processed
if isChainWithPageExpected {
n += r.processRemainingChainEvents(stdCtx, trCtx, publisher, initialResponse, chainIndex)
}
r.log.Infof("request finished: %d events published", n)
Expand Down Expand Up @@ -522,7 +531,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp)
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)

var n int
var eventCount int
Expand All @@ -544,6 +553,10 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}
response.Body = io.NopCloser(body)

// updates the cursor for pagination last_event & last_response when chaining is present
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()

// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
Expand Down Expand Up @@ -650,7 +663,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}

Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, log *
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response) <-chan maybeMsg {
func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg {
trCtx.clearIntervalData()

ch := make(chan maybeMsg)
Expand Down Expand Up @@ -158,6 +158,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
return
}

// last_response context object is updated here organically
trCtx.updateLastResponse(*page)

rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
Expand Down Expand Up @@ -192,6 +193,9 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
}
}
}
if !paginate {
break
}
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (ctx *transformContext) updateFirstResponse(r response) {

func (ctx *transformContext) clearIntervalData() {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lastEvent = &mapstr.M{}
ctx.firstEvent = &mapstr.M{}
ctx.lastResponse = &response{}
ctx.lock.Unlock()
}

type transformable mapstr.M
Expand Down

0 comments on commit afb2beb

Please sign in to comment.