-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Filebeat][httpjson] Convert httpjson input to a v2 input #20226
Conversation
Pinging @elastic/siem (Team:SIEM) |
4f05073
to
01e904e
Compare
01e904e
to
995608a
Compare
} | ||
} | ||
|
||
func (l *retryLogger) Printf(s string, args ...interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more of an enhancement request. There's a separate logger interface that can be implemented that might integrate more nicely with our logger (not 100% sure). With this interface each message contains a level like [INFO]
, I wonder if we switch to the leveled interface if the message would look more "native".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you were referring to implement the LeveledLogger
, I did so, let me know if you meant something different.
|
||
if mm != nil && in.config.DateCursor.IsEnabled() { | ||
in.advanceCursor(common.MapStr(mm)) | ||
_, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%s", url.Hostname(), port), time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use net.JoinHostPort
to ensure IPv6 addresses are boxed? I think url.Hostname() strips brackets.
from filebeat import BaseTest | ||
|
||
|
||
class Test(BaseTest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why replace the go based tests with a python test? This 'complicates', potentially slowing down the CI, plus increases the risk of introducing flaky tests.
If we want to switch to python based testing, maybe we can make this change in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some issues trying to spawn the cursor input manager in unit tests, that was the reason why I moved the integration tests to python. Since your following comments made a lot of sense, and I implemented the stateless input instead as a first step, I moved them back to go now, since the issues I faced with the cursor input seemed to not apply with that one.
StateStore: store, | ||
Type: inputName, | ||
Configure: configure, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the InputManager like this will store some state always in the statestore. The httpjson input is quite flexible, as it allows configurations that allow inputs to optionally store state or not.
I'd recommend to create a custom input manager that wraps the stateless and the cursor input managers. When configuring the input the wrapper would choose the right input manager instance, based on the user configuration.
Maybe we can split the effort by first converting the input to the v2 api using the stateless input manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, did so 👍
workerCtx: workerCtx, | ||
workerCancel: workerCancel, | ||
} | ||
in := &httpJSONInput{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks the requester does too much. It is a 'source', unpacks from the cursor, is a client, and executes the request. Plus the requester gets only partially initialized here. The source is used to allow to generate the key in the key value store, and to pass the input the address where to collect from. No need to confiure an http client yet. Configure should focus on unpacking the config, do not create helper objects that are required for data collection yet.
Following the code your source is the URL only. Maybe you want to consider to add other metadata to the source as well, e.g. parameters, username (in case services return different information based on the login).
Pass the config to the httpJSONInput, not your source. The requester can be created in the "Run" method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, tried to separate pagination, cursor and retry logic out of it, lmk if it still confusing.
config config | ||
client *http.Client | ||
cursorValue string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type does a lot. It implements rate limiting, request execution, response parsing+event creation, and publishing. It looks like it is very difficult to test. Can we split some functionality into a dedicated type and introduce unit testing? At least for rate limiting consider to use/introduce a third party library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the rate limit bit, it was not in the scope of this change, but it makes sense, will open a new issue to make it separately if that makes sense.
995608a
to
7b92ab9
Compare
Following @urso advice I pulled back and implemented stateless input instead. Will let the work of moving to a custom input manager to be able to use also the cursor input for a following pull request. This allowed to go back to go tests since the issues I faced when writing them for the cursor input do not apply for this one. At the same time did some refactoring to simplify input/requester logic. More improvements can be done but I think would be nice to save them for the following refactor of the input that we plan to do next. Updated the description to make it describe what it currently does. |
Maybe let's discuss this offline, but it would be nice to learn where you did struggle with testing. In libbeat we have helpers for mocking beat.Pipeline, and beat.Client. The |
|
||
url.RawQuery = q.Encode() | ||
|
||
return url.String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return the string instead of the already initialized URL struct?
teardown: func(i interface{}) { | ||
server := i.(*httptest.Server) | ||
server.Close() | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we ever return some other type but an http server?
teardown as is can be removed. Better pass testing.T
to setup. the close
can be registered with the test via t.Cleanup
.
For this tests we end up with 2 active servers. how about only providing a setupServer
method? then ssl
, setup
, and teardown
can be replaced by a single replacable function.
t.Fatal(err) | ||
} | ||
}) | ||
type publisher struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The libbeat/publisher/testing might already contain similar helpers.
in.log.Info("Process another repeated request.") | ||
err = in.processHTTPRequest(ctx, client, ri) | ||
log.Info("Process another repeated request.") | ||
err = requester.processHTTPRequest(stdCtx, publisher) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop is quite a common pattern. Better use timed.Periodic(stdCtx, in.Config.Interval, func() { ... })
. Using this pattern to check for cancellation in a loop one should add an extra select on the done channel only in order to confirm done has not been closed before doing the actual select statement.
Periodic is defined here: https://github.com/elastic/go-concert/blob/master/timed/timed.go#L57
return nil, nil | ||
func newHTTPJSONInput(config config) (*httpJSONInput, error) { | ||
if err := config.Validate(); err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: Validate is already called by Unpack. But I like it to be called here again, as the constructor should be independent of configure
.
7b92ab9
to
3ba6bda
Compare
log.Info("Process another repeated request.") | ||
err = requester.processHTTPRequest(stdCtx, publisher) | ||
if err != nil { | ||
periodCancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Periodic function will support error returns to stop the loop once #20590 is merged.
317973d
to
b4a2f08
Compare
- Paginator takes care of requesting next page info - Rate limiter takes care of rate limiting requests - Date cursor takes care of keeping track of cursor state
d23bdc6
to
25234dd
Compare
return in.config.URL | ||
} | ||
// Run starts the input worker then returns. Only the first invocation | ||
// will ever start the worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems to be wrong.
|
||
url, err := url.Parse(in.config.URL) | ||
httpClient, err := in.newHTTPClient(stdCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can this fail? Does the client make a request to init oauth support? Are there recoverable error types (e.g. io timeout)? In the later case we might want to put the initialization into a loop with exponential backoff.
in.log.Warn("date_cursor field must be a string, cursor will not advance") | ||
return | ||
if in.config.Interval == 0 { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Interval==0 a configuration error? Maybe this is exactly what some users want, (long-)poll the server.
} | ||
} | ||
} | ||
switch errors.Cause(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cause is used by some legacy error libs (e.g. pkg/errors which is not maintained anymore). Better use Unpack
or errors.Is(context.X)
.
} | ||
} | ||
switch errors.Cause(err) { | ||
case context.Canceled, context.DeadlineExceeded: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is DeadlineExceeded a signal that might be triggered internally by processHTTPRequest, or do we expect this to be triggered via the inputs parent context?
} | ||
msg, err := r.client.Do(req) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "failed to execute http client.Do") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this error is passed through to Run
. If so the input shuts down after an IO error. Timeouts/disconnects or other network errors should not shut down the input. These are recoverable errors in a sense of "let's try again".
errors.Wrap and similar make it difficult to "judge" the error (errors.Is
can help to check for a cause). Consider to either log recoverable errors or create special error types that allow you to see what's going on.
About errors.Wrap
: We should stop using the pkg/errors package. It is discontinued. At the same time go 1.14 introduced something similar via fmt.Errorf
with %w
. Better use fmt.Errorf("failed to execute http request: %w", err)
|
||
responseData, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to read http response") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IO error here should not force the input to shutdown.
) * Convert httpjson input to a v2 cursor input * Add CHANGELOG entry * Fix format errors * Convert to stateless input and refactor: - Paginator takes care of requesting next page info - Rate limiter takes care of rate limiting requests - Date cursor takes care of keeping track of cursor state * Remove python tests * Do not fail if there is no next page * Refactor go integration tests to work with v2 input * Do suggested changes to input and tests * Update time.Periodic call with error return * Change test duration values * Sepparate sync test case * Create custon url config type * Change input.Run comment * Change input.Run to only return on context cancellation * Remove all usages of pkg/errors (cherry picked from commit 929e838)
…20702) * Convert httpjson input to a v2 cursor input * Add CHANGELOG entry * Fix format errors * Convert to stateless input and refactor: - Paginator takes care of requesting next page info - Rate limiter takes care of rate limiting requests - Date cursor takes care of keeping track of cursor state * Remove python tests * Do not fail if there is no next page * Refactor go integration tests to work with v2 input * Do suggested changes to input and tests * Update time.Periodic call with error return * Change test duration values * Sepparate sync test case * Create custon url config type * Change input.Run comment * Change input.Run to only return on context cancellation * Remove all usages of pkg/errors (cherry picked from commit 929e838)
) * Convert httpjson input to a v2 cursor input * Add CHANGELOG entry * Fix format errors * Convert to stateless input and refactor: - Paginator takes care of requesting next page info - Rate limiter takes care of rate limiting requests - Date cursor takes care of keeping track of cursor state * Remove python tests * Do not fail if there is no next page * Refactor go integration tests to work with v2 input * Do suggested changes to input and tests * Update time.Periodic call with error return * Change test duration values * Sepparate sync test case * Create custon url config type * Change input.Run comment * Change input.Run to only return on context cancellation * Remove all usages of pkg/errors
What does this PR do?
Converts httpjson input to the new v2 stateless input.
Why is it important?
Several modules rely on httpjson input, and would benefit a lot from the new v2 cursor feature that keeps persistence of a cursor between restarts.
This change moves httpjson input to new v2 input, preparing to be able to implement the cursor persistence state in a following change.
Checklist
- [] I have made corresponding changes to the documentation- [] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues
Relates to #19486