-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move non-web related processing into processor, add "publish" package #1324
Conversation
jenkins, test this please |
1 similar comment
jenkins, test this please |
33f14da
to
4f2d4ff
Compare
jenkins, test this please |
4f2d4ff
to
3042571
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate these changes, as they split the v2 handler into several concerns and are more aligned with the current code structure.
I added mainly small comments, except for moving http related information to the processor
package, see below.
publish/pub.go
Outdated
type PendingReq struct { | ||
Transformables []transform.Transformable | ||
Tcontext *transform.Context | ||
Trace bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep trace
private.
beater/v2_handler.go
Outdated
if err != nil { | ||
sr := stream.Result{} | ||
sr.AddWithMessage(stream.ServerError, 1, err.Error()) | ||
v.sendResponse(logger, w, &sr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for this, and ensure to return here.
beater/v2_handler_test.go
Outdated
} | ||
expectedBuf, err := expected.marshal() | ||
func TestInvalidContentType(t *testing.T) { | ||
req, err := http.NewRequest("POST", "/v2/intake", nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the refactoring it becomes more obvious that server Integration tests for v2 are missing. You could add them e.g. to server_test.go
, to run current available tests for v1 and v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coming up next 👍
beater/v2_integration_test.go
Outdated
@@ -79,7 +82,7 @@ func TestV2IntakeIntegration(t *testing.T) { | |||
name := fmt.Sprintf("approved-es-documents/testV2IntakeIntegration%s", test.name) | |||
r = r.WithContext(context.WithValue(r.Context(), "name", name)) | |||
reqTimestamp, err := time.Parse(time.RFC3339, "2018-08-01T10:00:00Z") | |||
r = r.WithContext(context.WithValue(r.Context(), requestTimeContextKey, reqTimestamp)) | |||
r = r.WithContext(utility.ContextWithRequestTime(r.Context(), reqTimestamp)) | |||
handler.ServeHTTP(w, r) | |||
|
|||
assert.Equal(t, test.status, w.Code) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to the processor package. It is a start for what we had in the processor/package
tests for v1.
decoder/req_decoder.go
Outdated
// CompressedRequestReader makes a function that uses information from an http request to construct a Limited ReadCloser | ||
// from the body of the request, handling any decompression necessary | ||
// CompressedRequestReader returns a reader that will decompress the body according | ||
// the supplied Content-Encoding header in the request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are missing a to here.
processor/stream/result.go
Outdated
@@ -131,7 +131,7 @@ func (s *streamResponse) String() string { | |||
return strings.Join(errorList, ", ") | |||
} | |||
|
|||
func (s *streamResponse) statusCode() int { | |||
func (s *Result) StatusCode() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The beater
package has been the abstraction layer for http
so far. I'd avoid moving http
related information into the processor
package, and would rather define errors here, that are translated to http errors in the beater
package (similar to having a ErrFull
in the publisher
package that gets translated to an http error then in the beater
package for v1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Do you agree that it makes sense to have Result
be part of stream
but translate it to a http type error in beater
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's exactly what I meant!
4a26547
to
c41bf75
Compare
I moved the v2 integration tests into At the moment, the stream result is just json encoded in I didn't feel it would makes sense to create a new structure in |
c41bf75
to
a3c40f1
Compare
If it's OK with you, I’m planning to create the v2 server integration tests you asked for in a separate PR. This one is already doing too much. |
a3c40f1
to
83bfc53
Compare
83bfc53
to
8748d73
Compare
processor/stream/result.go
Outdated
ProcessingTimeoutErrType ErrorType = 2 | ||
InvalidInputErrType ErrorType = 3 | ||
ShuttingDownErrType ErrorType = 4 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you used a string here e.g. const ( QueuFullErrType = "QueueFullErr" .. )
it would be pretty easy to return an error type to the agents.
In case you want to stick to having int
I suggest following:
type StreamError int
const(
QueueFullErrType StreamError = iota
ProcessingTimeoutErrType
InvalidInputErrType
ShuttingDownErrType
)
} | ||
|
||
func (r *Result) LimitedAdd(err error) { | ||
if len(r.Errors) < errorsLimit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the logic of keeping up to n
errors for every error type, and which http status is returned more or less gets random, based on which errors come first.
I would internally keep a map[ErrorType][]error
or a map[ErrorType]error
struct. This would also allow to not looping over all values when figuring out the highest http.status_code
to return but check for available keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true that it's random now. I considered it, and went with this because it was what we agreed, its simple and likely good enough for a start. Your suggestion would require us to decide on which errors should take precedence.
Looping over 5 items is not really a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear, I'm happy to discuss this and i agree it would be better to make sure some specific errors are included, but i think we should move forward with this behavior at this time
"message": "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/1] allOf failed\n I[#/id] S[#/allOf/1/properties/id/type] expected string, but got number" | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already mentioned you prefer adding tests later. Please keep on your list to add more exhaustive error returning testing, as here only one invalid json error is tested.
I can imagine it will be hard to have a good overview over what has been tested and what not as the changes are not in the same PR.
processor/stream/stream_processor.go
Outdated
return nil, &Error{ | ||
Type: InvalidInputErrType, | ||
Message: e.Error(), | ||
Document: string(reader.LastLine()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a native - but shouldn't this be LatestLine
?
processor/stream/stream_processor.go
Outdated
response.LimitedAdd(&Error{ | ||
Type: InvalidInputErrType, | ||
Message: err.Error(), | ||
Document: string(reader.LastLine()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add the rawModel
here as this lead to the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rawModel
is a map[string]interface{}
here, not a string or []byte.
processor/stream/stream_processor.go
Outdated
} | ||
|
||
// readBatch will read up to `batchSize` objects from the ndjson stream | ||
// it returns a slice of eventables, a serverResponse and a bool that indicates if we're at EOF. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually not true, the serverResponse
is not returned. I think it would be better design though if the response were created in readBatch
and returned and the caller then adds it to the overall response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, that comment didn't get updated :)
Returning the response instead of taking a reference would requires us to merge the two responses every time readBatch returns. There's no merging code atm. Do you think it's worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors should be edge cases. Thus, I don't expect a big performance impact one way or the other. I personally find it cleaner, but no strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the comment now. I prefer to leave the arguments for now, but happy to discuss again at a later point.
processor/stream/stream_processor.go
Outdated
Type: ShuttingDownErrType, | ||
Message: "server is shutting down", | ||
}) | ||
case publish.ErrFull: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As roncohen#3 has not been merged, I guess this will need to be implemented again after merging this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to refactor this area, so your PR wouldn't apply cleanly anymore. During the refactor i changed the behavior in this PR so that it returns when the queue is full, like in your PR.
thanks for the thorough review @simitt ! |
…elastic#1324) also simplify stream error handling.
…elastic#1324) also simplify stream error handling.
…elastic#1324) also simplify stream error handling.
…ckage (elastic#1324) also simplify stream error handling.
…ckage (elastic#1324) also simplify stream error handling.
…elastic#1324) also simplify stream error handling.
…ckage (elastic#1324) also simplify stream error handling.
…ckage (elastic#1324) also simplify stream error handling.
…ckage (#1324) also simplify stream error handling.
This introduces the
processor/stream
andpublish
packages and moves relevant code frombeater
to the new packages.