Skip to content

Commit

Permalink
[fileconsumer] Move scanner into internal package (#23999)
Browse files Browse the repository at this point in the history
Follows #23998 

Moves the `PositionalScanner` into an `internal/scanner` package.
  • Loading branch information
djaglowski authored Jul 6, 2023
1 parent 914fd0b commit 751c2e1
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 133 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fileconsumer-internal-scanner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make fileconsumer.PositionalScanner internal

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23999]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
package scanner // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"

import (
"bufio"
Expand All @@ -11,55 +11,46 @@ import (
stanzaerrors "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors"
)

const defaultBufSize = 16 * 1024
const DefaultBufferSize = 16 * 1024

// PositionalScanner is a scanner that maintains position
//
// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
type PositionalScanner struct {
// Scanner is a scanner that maintains position
type Scanner struct {
pos int64
*bufio.Scanner
}

// NewPositionalScanner creates a new positional scanner
//
// Deprecated: [v0.80.0] This will be made internal in a future release, tentatively v0.82.0.
func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner {
ps := &PositionalScanner{
pos: startOffset,
Scanner: bufio.NewScanner(r),
}

buf := make([]byte, 0, defaultBufSize)
ps.Scanner.Buffer(buf, maxLogSize*2)

// New creates a new positional scanner
func New(r io.Reader, maxLogSize int, bufferSize int, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {
s := &Scanner{Scanner: bufio.NewScanner(r), pos: startOffset}
s.Buffer(make([]byte, 0, bufferSize), maxLogSize)
scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
if (advance == 0 && token == nil && err == nil) && len(data) >= 2*maxLogSize {
if (advance == 0 && token == nil && err == nil) && len(data) >= maxLogSize {
// reference: https://pkg.go.dev/bufio#SplitFunc
// splitFunc returns (0, nil, nil) to signal the Scanner to read more data but the buffer is full.
// Truncate the log entry.
advance, token, err = maxLogSize, data[:maxLogSize], nil
} else if len(token) > maxLogSize {
advance, token = maxLogSize, token[:maxLogSize]
}
ps.pos += int64(advance)
s.pos += int64(advance)
return
}
ps.Scanner.Split(scanFunc)
return ps
s.Split(scanFunc)
return s
}

// Pos returns the current position of the scanner
func (ps *PositionalScanner) Pos() int64 {
return ps.pos
func (s *Scanner) Pos() int64 {
return s.pos
}

func (ps *PositionalScanner) getError() error {
err := ps.Err()
func (s *Scanner) Error() error {
err := s.Err()
if errors.Is(err, bufio.ErrTooLong) {
return stanzaerrors.NewError("log entry too large", "increase max_log_size or ensure that multiline regex patterns terminate")
} else if err != nil {
}
if err != nil {
return stanzaerrors.Wrap(err, "scanner error")
}
return nil
Expand Down
152 changes: 152 additions & 0 deletions pkg/stanza/fileconsumer/internal/scanner/scanner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scanner

import (
"bufio"
"bytes"
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestScanner(t *testing.T) {
testCases := []struct {
name string
stream []byte
delimiter []byte
startOffset int64
maxSize int
bufferSize int
expected [][]byte
skipFirstDelimiter bool
}{
{
name: "simple",
stream: []byte("testlog1\ntestlog2\n"),
delimiter: []byte("\n"),
maxSize: 100,
bufferSize: DefaultBufferSize,
expected: [][]byte{
[]byte("testlog1"),
[]byte("testlog2"),
},
},
{
name: "empty_tokens",
stream: []byte("\ntestlog1\n\ntestlog2\n\n"),
delimiter: []byte("\n"),
maxSize: 100,
bufferSize: DefaultBufferSize,
expected: [][]byte{
[]byte(""),
[]byte("testlog1"),
[]byte(""),
[]byte("testlog2"),
[]byte(""),
},
},
{
name: "multichar_delimiter",
stream: []byte("testlog1@#$testlog2@#$"),
delimiter: []byte("@#$"),
maxSize: 100,
bufferSize: DefaultBufferSize,
expected: [][]byte{
[]byte("testlog1"),
[]byte("testlog2"),
},
},
{
name: "multichar_delimiter_empty_tokens",
stream: []byte("@#$testlog1@#$@#$testlog2@#$@#$"),
delimiter: []byte("@#$"),
maxSize: 100,
bufferSize: DefaultBufferSize,
expected: [][]byte{
[]byte(""),
[]byte("testlog1"),
[]byte(""),
[]byte("testlog2"),
[]byte(""),
},
},
{
name: "overflow_maxlogsize",
stream: []byte("testlog1islongerthanmaxlogsize\n"),
delimiter: []byte("\n"),
maxSize: 20,
bufferSize: DefaultBufferSize,
expected: [][]byte{
[]byte("testlog1islongerthan"),
[]byte("maxlogsize"),
},
skipFirstDelimiter: true,
},
{
name: "overflow_buffer",
stream: []byte("testlog1islongerthanbuffer\n"),
delimiter: []byte("\n"),
maxSize: 20,
bufferSize: 20,
expected: [][]byte{
[]byte("testlog1islongerthan"),
[]byte("buffer"),
},
skipFirstDelimiter: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
scanner := New(bytes.NewReader(tc.stream), tc.maxSize, tc.bufferSize, tc.startOffset, simpleSplit(tc.delimiter))
for i, p := 0, 0; scanner.Scan(); i++ {
assert.NoError(t, scanner.Error())

token := scanner.Bytes()
assert.Equal(t, tc.expected[i], token)

p += len(tc.expected[i])
if i > 0 || !tc.skipFirstDelimiter {
p += len(tc.delimiter)
}
assert.Equal(t, int64(p), scanner.Pos())
}
assert.NoError(t, scanner.Error())
})
}
}

func simpleSplit(delim []byte) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.Index(data, delim); i >= 0 {
return i + len(delim), data[:i], nil
}
return 0, nil, nil
}
}

type errReader struct {
err error
}

func (r *errReader) Read([]byte) (n int, err error) {
return 0, r.err
}

func TestScannerError(t *testing.T) {
reader := &errReader{err: bufio.ErrTooLong}
scanner := New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
assert.False(t, scanner.Scan())
assert.EqualError(t, scanner.Error(), "log entry too large")

reader = &errReader{err: errors.New("some err")}
scanner = New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
assert.False(t, scanner.Scan())
assert.EqualError(t, scanner.Error(), "scanner error: some err")
}
13 changes: 7 additions & 6 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner := NewPositionalScanner(r, r.maxLogSize, r.Offset, r.splitFunc)
s := scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)

// Iterate over the tokenized file, emitting entries as we go
for {
Expand All @@ -76,18 +77,18 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
default:
}

ok := scanner.Scan()
ok := s.Scan()
if !ok {
r.eof = true
if err := scanner.getError(); err != nil {
if err := s.Error(); err != nil {
// If Scan returned an error then we are not guaranteed to be at the end of the file
r.eof = false
r.Errorw("Failed during scan", zap.Error(err))
}
break
}

token, err := r.encoding.Decode(scanner.Bytes())
token, err := r.encoding.Decode(s.Bytes())
if err != nil {
r.Errorw("decode: %w", zap.Error(err))
} else {
Expand All @@ -105,10 +106,10 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner = NewPositionalScanner(r, r.maxLogSize, r.Offset, r.splitFunc)
s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)
}

r.Offset = scanner.Pos()
r.Offset = s.Pos()
}
}

Expand Down
Loading

0 comments on commit 751c2e1

Please sign in to comment.