-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Heartbeat] Fix excessive memory usage when parsing bodies #15639
Changes from 6 commits
f5b2816
50e4218
7f044df
cae4335
03c5110
16af47d
2ee596b
eb65da3
a820f1d
2de86c8
0b6a5af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,9 @@ import ( | |
"encoding/hex" | ||
"io" | ||
"net/http" | ||
"unicode/utf8" | ||
"strings" | ||
|
||
"github.com/docker/go-units" | ||
|
||
"github.com/elastic/beats/heartbeat/reason" | ||
"github.com/elastic/beats/libbeat/common" | ||
|
@@ -31,7 +33,7 @@ import ( | |
// maxBufferBodyBytes sets a hard limit on how much we're willing to buffer for any reason internally. | ||
// since we must buffer the whole body for body validators this is effectively a cap on that. | ||
// 100MiB out to be enough for everybody. | ||
const maxBufferBodyBytes = 100 * 1024 * 1024 | ||
const maxBufferBodyBytes = 100 * units.MiB | ||
|
||
func processBody(resp *http.Response, config responseConfig, validator multiValidator) (common.MapStr, reason.Reason) { | ||
// Determine how much of the body to actually buffer in memory | ||
|
@@ -94,43 +96,20 @@ func readBody(resp *http.Response, maxSampleBytes int) (bodySample string, bodyS | |
|
||
func readPrefixAndHash(body io.ReadCloser, maxPrefixSize int) (respSize int, prefix string, hashStr string, err error) { | ||
hash := sha256.New() | ||
// Function to lazily get the body of the response | ||
rawBuf := make([]byte, 1024) | ||
|
||
// Buffer to hold the prefix output along with tracking info | ||
prefixBuf := make([]byte, maxPrefixSize) | ||
prefixRemainingBytes := maxPrefixSize | ||
prefixWriteOffset := 0 | ||
for { | ||
readSize, readErr := body.Read(rawBuf) | ||
|
||
respSize += readSize | ||
hash.Write(rawBuf[:readSize]) | ||
|
||
if prefixRemainingBytes > 0 { | ||
if readSize >= prefixRemainingBytes { | ||
copy(prefixBuf[prefixWriteOffset:maxPrefixSize], rawBuf[:prefixRemainingBytes]) | ||
prefixWriteOffset += prefixRemainingBytes | ||
prefixRemainingBytes = 0 | ||
} else { | ||
copy(prefixBuf[prefixWriteOffset:prefixWriteOffset+readSize], rawBuf[:readSize]) | ||
prefixWriteOffset += readSize | ||
prefixRemainingBytes -= readSize | ||
} | ||
} | ||
|
||
if readErr == io.EOF { | ||
break | ||
} | ||
var prefixBuf strings.Builder | ||
|
||
if readErr != nil { | ||
return 0, "", "", readErr | ||
} | ||
n, err := io.Copy(&prefixBuf, io.TeeReader(io.LimitReader(body, int64(maxPrefixSize)), hash)) | ||
if err == nil { | ||
// finish streaming into hash if the body has not been fully consumed yet | ||
var m int64 | ||
m, err = io.Copy(hash, body) | ||
n += m | ||
} | ||
|
||
// We discard the body if it is not valid UTF-8 | ||
if utf8.Valid(prefixBuf[:prefixWriteOffset]) { | ||
prefix = string(prefixBuf[:prefixWriteOffset]) | ||
if err != nil { | ||
return 0, "", "", err | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if the whole loop can be replaced using the
This reads up to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooooooh, that's nice, will give it a shot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm actually wondering now, the only reason we do the prefix reading in the first place is to give something for the response checkers to read. The problem before was that if you used a JSON checker and a grep checker, they'd share the same IO, so one would use up the stream before the other could get to work. Maybe we should just use the tee reader in the first place to split the stream between all the users, then we can always read the full body and not worry about reading some fixed quantity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I think that's a bit out of scope here. I'd like to keep the code as-is. I tried to get things cleaner with the TeeReader, but that really requires us to have more of a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the hash it should be enough to add another
Alternatively we could introduce a LimitedWriter and a TeeWriter (as you've already noticed). They don't exist in our code base, so we would need to implement these ourselves :(. Then the operation would become:
The idea is similar to my first sample code. Let's try to reduce code handling buffers, indices, offsets and size limits in loops by replacing them using more declarative (reusable?) reader/writer constructors. Unfortunately parsers like At least for JSON I have an alternative. In Beats we ues go-structform for spooling to disk and in our outputs. Interestingly the JSON parser has a push based interface: https://github.com/elastic/go-structform/blob/master/json/parse.go#L154 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've fixed the code to use Thinking about it, I think I'm fine with the buffer approach for now. Chances are that users dealing with large files don't want to use either JSON or regexp matching. I think I'm fine waiting for that feature request to come in. |
||
return respSize, prefix, hex.EncodeToString(hash.Sum(nil)), nil | ||
|
||
return int(n), prefixBuf.String(), hex.EncodeToString(hash.Sum(nil)), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change in semantics. The original code used to
break
iferr == io.EOF
.This should fix it:
if err != nil && err != io.EOF {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, fixed and pushed. I think there are no other outstanding issues.