Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson][bugfix] - root request pagination now working with chaining #32722

Merged
merged 10 commits into from
Aug 23, 2022
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Filebeat*

- [Httpjson]- Fix for pagination at root level not working when used with chaining.

*Heartbeat*
- Browser monitors (beta) no write to the `synthetics-*` index prefix. {pull}32064[32064]
Expand Down
26 changes: 13 additions & 13 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ type chainConfig struct {
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Auth *authConfig `config:"auth"`
Request *requestConfig `config:"request" validate:"required"`
Response *responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

// whileConfig will contain basic properties like auth parameters, request parameters,
Expand All @@ -82,11 +82,11 @@ type stepConfig struct {
// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If
// request.retry.max_attempts is not specified , the max_attempts is always 1.
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Until *valueTpl `config:"until" validate:"required"`
Auth *authConfig `config:"auth"`
Request *requestConfig `config:"request" validate:"required"`
Response *responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Until *valueTpl `config:"until" validate:"required"`
}

type responseChainConfig struct {
Expand All @@ -100,13 +100,13 @@ func defaultChainConfig() config {
{
While: &whileConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
Request: chaincfg.Request,
Response: &responseChainConfig{},
},
Step: &stepConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
Request: chaincfg.Request,
Response: &responseChainConfig{},
},
},
}
Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,36 @@ func TestInput(t *testing.T) {
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test pagination when used with chaining",
setupServer: newChainPaginationTestServer(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.records[:].id",
},
},
},
},
handler: defaultHandler(http.MethodGet, ""),
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -668,6 +698,34 @@ func newChainTestServer(
}
}

func newChainPaginationTestServer(
newServer func(http.Handler) *httptest.Server,
) func(*testing.T, http.HandlerFunc, map[string]interface{}) {
return func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
var serverURL string
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
link := serverURL + "/link2"
value := fmt.Sprintf(`{"records":[{"id":1}], "nextLink":"%s"}`, link)
fmt.Fprintln(w, value)
case "/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/link2":
fmt.Fprintln(w, `{"records":[{"id":2}]}`)
case "/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.records[:].id"
t.Cleanup(func() { registeredTransforms = newRegistry() })
}
}

func newV2Context() (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Expand Down
Loading