diff --git a/deploy/kubernetes/metricbeat-kubernetes.yaml b/deploy/kubernetes/metricbeat-kubernetes.yaml index 774aa2a3748a..70d73aef019d 100644 --- a/deploy/kubernetes/metricbeat-kubernetes.yaml +++ b/deploy/kubernetes/metricbeat-kubernetes.yaml @@ -19,6 +19,9 @@ data: - type: kubernetes scope: cluster node: ${NODE_NAME} + # In large Kubernetes clusters consider setting unique to false + # to avoid using the leader election strategy and + # instead run a dedicated Metricbeat instance using a Deployment in addition to the DaemonSet unique: true templates: - config: diff --git a/deploy/kubernetes/metricbeat/metricbeat-daemonset-configmap.yaml b/deploy/kubernetes/metricbeat/metricbeat-daemonset-configmap.yaml index a60395f4490e..a51845f4f9a4 100644 --- a/deploy/kubernetes/metricbeat/metricbeat-daemonset-configmap.yaml +++ b/deploy/kubernetes/metricbeat/metricbeat-daemonset-configmap.yaml @@ -19,6 +19,9 @@ data: - type: kubernetes scope: cluster node: ${NODE_NAME} + # In large Kubernetes clusters consider setting unique to false + # to avoid using the leader election strategy and + # instead run a dedicated Metricbeat instance using a Deployment in addition to the DaemonSet unique: true templates: - config: diff --git a/libbeat/reader/readfile/bench_test.go b/libbeat/reader/readfile/bench_test.go new file mode 100644 index 000000000000..b1f6e7667f60 --- /dev/null +++ b/libbeat/reader/readfile/bench_test.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readfile + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "math/rand" + "testing" + + "golang.org/x/text/encoding" +) + +func BenchmarkEncoderReader(b *testing.B) { + const ( + bufferSize = 1024 + lineMaxLimit = 1000000 // never hit by the input data + ) + + runBench := func(name string, lineMaxLimit int, lines []byte) { + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + for bN := 0; bN < b.N; bN++ { + reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit}) + if err != nil { + b.Fatal("failed to initialize reader:", err) + } + // Read decodec lines and test + size := 0 + for i := 0; ; i++ { + msg, err := reader.Next() + if err != nil { + if err == io.EOF { + b.ReportMetric(float64(i), "processed_lines") + break + } else { + b.Fatal("unexpected error:", err) + } + } + size += msg.Bytes + } + b.ReportMetric(float64(size), "processed_bytes") + } + }) + } + + runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020)) + runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10)) + runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000)) + // short lineMaxLimit to exercise skipUntilNewLine + runBench("skip lines", 1024, createBenchmarkLines(100, 10_000)) +} + +func createBenchmarkLines(numLines int, lineLength int) []byte { + buf := bytes.NewBuffer(nil) + for i := 0; i < numLines; i++ { + line := make([]byte, hex.DecodedLen(lineLength)) + if _, err := rand.Read(line); err != nil { + panic(fmt.Sprintf("failed to generate random input: %v", err)) + } + buf.WriteString(hex.EncodeToString(line)) + buf.WriteRune('\n') + } + return buf.Bytes() +} diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index c36b524dde24..78331a7d246f 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -30,12 +30,11 @@ import ( const unlimited = 0 -// lineReader reads lines from underlying reader, decoding the input stream +// LineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. type LineReader struct { reader io.ReadCloser - bufferSize int maxBytes int // max bytes per line limit to avoid OOM with malformatted files nl []byte decodedNl []byte @@ -44,10 +43,11 @@ type LineReader struct { inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer + tempBuffer []byte logger *logp.Logger } -// New creates a new reader object +// NewLineReader creates a new reader object func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { encoder := config.Codec.NewEncoder() @@ -64,13 +64,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { return &LineReader{ reader: input, - bufferSize: config.BufferSize, maxBytes: config.MaxBytes, decoder: config.Codec.NewDecoder(), nl: nl, decodedNl: terminator, inBuffer: streambuf.New(nil), outBuffer: streambuf.New(nil), + tempBuffer: make([]byte, config.BufferSize), logger: logp.NewLogger("reader_line"), }, nil } @@ -133,18 +133,17 @@ func (r *LineReader) advance() error { r.inOffset = newOffset } - buf := make([]byte, r.bufferSize) - // Try to read more bytes into buffer - n, err := r.reader.Read(buf) + n, err := r.reader.Read(r.tempBuffer) if err == io.EOF && n > 0 { // Continue processing the returned bytes. The next call will yield EOF with 0 bytes. err = nil } - // Appends buffer also in case of err - r.inBuffer.Append(buf[:n]) + // Write to buffer also in case of err + r.inBuffer.Write(r.tempBuffer[:n]) + if err != nil { return err } @@ -170,7 +169,7 @@ func (r *LineReader) advance() error { // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine if idx == -1 && r.inBuffer.Len() > r.maxBytes { - skipped, err := r.skipUntilNewLine(buf) + skipped, err := r.skipUntilNewLine() if err != nil { r.logger.Error("Error skipping until new line, err:", err) return err @@ -204,7 +203,7 @@ func (r *LineReader) advance() error { return err } -func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { +func (r *LineReader) skipUntilNewLine() (int, error) { // The length of the line skipped skipped := r.inBuffer.Len() @@ -221,14 +220,14 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { // Read until the new line is found for idx := -1; idx == -1; { - n, err := r.reader.Read(buf) + n, err := r.reader.Read(r.tempBuffer) // Check bytes read for newLine if n > 0 { - idx = bytes.Index(buf[:n], r.nl) + idx = bytes.Index(r.tempBuffer[:n], r.nl) if idx != -1 { - r.inBuffer.Append(buf[idx+len(r.nl) : n]) + r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) skipped += idx } else { skipped += n @@ -249,14 +248,13 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { func (r *LineReader) decode(end int) (int, error) { var err error - buffer := make([]byte, 1024) inBytes := r.inBuffer.Bytes() start := 0 for start < end { var nDst, nSrc int - nDst, nSrc, err = r.decoder.Transform(buffer, inBytes[start:end], false) + nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false) if err != nil { // Check if error is different from destination buffer too short if err != transform.ErrShortDst { @@ -270,7 +268,7 @@ func (r *LineReader) decode(end int) (int, error) { } start += nSrc - r.outBuffer.Write(buffer[:nDst]) + r.outBuffer.Write(r.tempBuffer[:nDst]) } r.byteCount += start