forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
LineReader: Reuse temporary buffer to reduce per-line allocation (ela…
…stic#27782) ## What does this PR do? Previously, the `LineReader` would allocate a []byte of size `config.BufferSize` before decoding each line. The underlying array's size allocation is fixed, so `outBuffer.Append` retains all of it even when the appended bytes are much shorter. With this change, we store a single `tempBuffer []byte` which is reused across lines anywhere we need temporary storage. Converting to `outBuffer.Write` forces the buffer to copy data out of tempBuffer, but is able to only allocate space for the written bytes. ## Why is it important? In our production environment, we run beats with k8s-enforced memory limits and are trying to resolve OOMs. The LineReader code path contributes a significant amount of memory allocation. The benchmarks added in bench_test.go show this reduces the memory profile with various line lengths: ``` goos: darwin goarch: amd64 pkg: github.com/elastic/beats/v7/libbeat/reader/readfile cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz name old time/op new time/op delta EncoderReader/buffer-sized_lines-16 125µs ± 3% 94µs ± 9% -24.55% (p=0.008 n=5+5) EncoderReader/short_lines-16 52.6µs ± 4% 36.3µs ±10% -30.88% (p=0.008 n=5+5) EncoderReader/long_lines-16 1.82ms ± 2% 1.70ms ±10% ~ (p=0.151 n=5+5) EncoderReader/skip_lines-16 133µs ± 3% 140µs ± 8% ~ (p=0.151 n=5+5) name old alloc/op new alloc/op delta EncoderReader/buffer-sized_lines-16 442kB ± 0% 239kB ± 0% -46.07% (p=0.000 n=4+5) EncoderReader/short_lines-16 118kB ± 0% 15kB ± 0% -87.27% (p=0.008 n=5+5) EncoderReader/long_lines-16 8.73MB ± 0% 7.63MB ± 0% -12.62% (p=0.000 n=4+5) EncoderReader/skip_lines-16 270kB ± 0% 220kB ± 0% -18.58% (p=0.008 n=5+5) name old allocs/op new allocs/op delta EncoderReader/buffer-sized_lines-16 718 ± 0% 519 ± 0% -27.72% (p=0.008 n=5+5) EncoderReader/short_lines-16 522 ± 0% 421 ± 0% -19.35% (p=0.008 n=5+5) EncoderReader/long_lines-16 2.65k ± 0% 1.58k ± 0% -40.54% (p=0.008 n=5+5) EncoderReader/skip_lines-16 420 ± 0% 419 ± 0% -0.24% (p=0.008 n=5+5) ```
- Loading branch information
Showing
3 changed files
with
99 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package readfile | ||
|
||
import ( | ||
"bytes" | ||
"encoding/hex" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"math/rand" | ||
"testing" | ||
|
||
"golang.org/x/text/encoding" | ||
) | ||
|
||
func BenchmarkEncoderReader(b *testing.B) { | ||
const ( | ||
bufferSize = 1024 | ||
lineMaxLimit = 1000000 // never hit by the input data | ||
) | ||
|
||
runBench := func(name string, lineMaxLimit int, lines []byte) { | ||
b.Run(name, func(b *testing.B) { | ||
b.ReportAllocs() | ||
for bN := 0; bN < b.N; bN++ { | ||
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit}) | ||
if err != nil { | ||
b.Fatal("failed to initialize reader:", err) | ||
} | ||
// Read decodec lines and test | ||
size := 0 | ||
for i := 0; ; i++ { | ||
msg, err := reader.Next() | ||
if err != nil { | ||
if err == io.EOF { | ||
b.ReportMetric(float64(i), "processed_lines") | ||
break | ||
} else { | ||
b.Fatal("unexpected error:", err) | ||
} | ||
} | ||
size += msg.Bytes | ||
} | ||
b.ReportMetric(float64(size), "processed_bytes") | ||
} | ||
}) | ||
} | ||
|
||
runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020)) | ||
runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10)) | ||
runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000)) | ||
// short lineMaxLimit to exercise skipUntilNewLine | ||
runBench("skip lines", 1024, createBenchmarkLines(100, 10_000)) | ||
} | ||
|
||
func createBenchmarkLines(numLines int, lineLength int) []byte { | ||
buf := bytes.NewBuffer(nil) | ||
for i := 0; i < numLines; i++ { | ||
line := make([]byte, hex.DecodedLen(lineLength)) | ||
if _, err := rand.Read(line); err != nil { | ||
panic(fmt.Sprintf("failed to generate random input: %v", err)) | ||
} | ||
buf.WriteString(hex.EncodeToString(line)) | ||
buf.WriteRune('\n') | ||
} | ||
return buf.Bytes() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters