Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/7.x' into mergify/bp/7.x/pr-27880
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 14, 2021
2 parents 1cae31d + 856a675 commit 1897af8
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 17 deletions.
3 changes: 3 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ data:
- type: kubernetes
scope: cluster
node: ${NODE_NAME}
# In large Kubernetes clusters consider setting unique to false
# to avoid using the leader election strategy and
# instead run a dedicated Metricbeat instance using a Deployment in addition to the DaemonSet
unique: true
templates:
- config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ data:
- type: kubernetes
scope: cluster
node: ${NODE_NAME}
# In large Kubernetes clusters consider setting unique to false
# to avoid using the leader election strategy and
# instead run a dedicated Metricbeat instance using a Deployment in addition to the DaemonSet
unique: true
templates:
- config:
Expand Down
83 changes: 83 additions & 0 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"

"golang.org/x/text/encoding"
)

func BenchmarkEncoderReader(b *testing.B) {
const (
bufferSize = 1024
lineMaxLimit = 1000000 // never hit by the input data
)

runBench := func(name string, lineMaxLimit int, lines []byte) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
// Read decodec lines and test
size := 0
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
b.Fatal("unexpected error:", err)
}
}
size += msg.Bytes
}
b.ReportMetric(float64(size), "processed_bytes")
}
})
}

runBench("buffer-sized lines", lineMaxLimit, createBenchmarkLines(100, 1020))
runBench("short lines", lineMaxLimit, createBenchmarkLines(100, 10))
runBench("long lines", lineMaxLimit, createBenchmarkLines(100, 10_000))
// short lineMaxLimit to exercise skipUntilNewLine
runBench("skip lines", 1024, createBenchmarkLines(100, 10_000))
}

func createBenchmarkLines(numLines int, lineLength int) []byte {
buf := bytes.NewBuffer(nil)
for i := 0; i < numLines; i++ {
line := make([]byte, hex.DecodedLen(lineLength))
if _, err := rand.Read(line); err != nil {
panic(fmt.Sprintf("failed to generate random input: %v", err))
}
buf.WriteString(hex.EncodeToString(line))
buf.WriteRune('\n')
}
return buf.Bytes()
}
32 changes: 15 additions & 17 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
Expand All @@ -44,10 +43,11 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// New creates a new reader object
// NewLineReader creates a new reader object
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

Expand All @@ -64,13 +64,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {

return &LineReader{
reader: input,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}
Expand Down Expand Up @@ -133,18 +133,17 @@ func (r *LineReader) advance() error {
r.inOffset = newOffset
}

buf := make([]byte, r.bufferSize)

// Try to read more bytes into buffer
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Appends buffer also in case of err
r.inBuffer.Append(buf[:n])
// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
}
Expand All @@ -170,7 +169,7 @@ func (r *LineReader) advance() error {

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
skipped, err := r.skipUntilNewLine()
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
Expand Down Expand Up @@ -204,7 +203,7 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
func (r *LineReader) skipUntilNewLine() (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

Expand All @@ -221,14 +220,14 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)
n, err := r.reader.Read(r.tempBuffer)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
Expand All @@ -249,14 +248,13 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
start := 0

for start < end {
var nDst, nSrc int

nDst, nSrc, err = r.decoder.Transform(buffer, inBytes[start:end], false)
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
Expand All @@ -270,7 +268,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(buffer[:nDst])
r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down

0 comments on commit 1897af8

Please sign in to comment.