Skip to content

Commit

Permalink
Create new stanza encoding instance for each thread (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Aug 7, 2023
1 parent d72fd5d commit b8f0f42
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 42 deletions.
6 changes: 3 additions & 3 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s

var hs *headerSettings
if c.Header != nil {
enc, err := c.Splitter.EncodingConfig.Build()
enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hs, err = c.Header.buildHeaderSettings(enc.Encoding)
hs, err = c.Header.buildHeaderSettings(enc)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c Config) validate() error {
return errors.New("`max_batches` must not be negative")
}

_, err := c.Splitter.EncodingConfig.Build()
_, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func (b *readerBuilder) build() (r *Reader, err error) {
r.splitFunc = r.lineSplitFunc
}

enc, err := b.encodingConfig.Build()
enc, err := helper.LookupEncoding(b.encodingConfig.Encoding)
if err != nil {
return
}
r.encoding = enc
r.encoding = helper.NewEncoding(enc)

if b.file != nil {
r.file = b.file
Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,13 @@ func TestHeaderFingerprintIncluded(t *testing.T) {
},
}

enc, err := helper.EncodingConfig{
cfg := helper.EncodingConfig{
Encoding: "utf-8",
}.Build()
}
enc, err := helper.LookupEncoding(cfg.Encoding)
require.NoError(t, err)

h, err := headerConf.buildHeaderSettings(enc.Encoding)
h, err := headerConf.buildHeaderSettings(enc)
require.NoError(t, err)
f.headerSettings = h

Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/splitter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func newMultilineSplitterFactory(splitter helper.SplitterConfig) *multilineSplit

// Build builds Multiline Splitter struct
func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) {
enc, err := factory.EncodingConfig.Build()
enc, err := helper.LookupEncoding(factory.EncodingConfig.Encoding)
if err != nil {
return nil, err
}
flusher := factory.Flusher.Build()
splitter, err := factory.Multiline.Build(enc.Encoding, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}
Expand Down
28 changes: 11 additions & 17 deletions pkg/stanza/operator/helper/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"golang.org/x/text/transform"
)

// NewBasicConfig creates a new Encoding config
// NewEncodingConfig creates a new Encoding config
func NewEncodingConfig() EncodingConfig {
return EncodingConfig{
Encoding: "utf-8",
Expand All @@ -37,26 +37,20 @@ type EncodingConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
}

// Build will build an Encoding operator.
func (c EncodingConfig) Build() (Encoding, error) {
enc, err := lookupEncoding(c.Encoding)
if err != nil {
return Encoding{}, err
}

return Encoding{
Encoding: enc,
decodeBuffer: make([]byte, 1<<12),
decoder: enc.NewDecoder(),
}, nil
}

type Encoding struct {
Encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

func NewEncoding(enc encoding.Encoding) Encoding {
return Encoding{
Encoding: enc,
decoder: enc.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
}
}

// Decode converts the bytes in msgBuf to utf-8 from the configured encoding
func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) {
for {
Expand Down Expand Up @@ -84,7 +78,7 @@ var encodingOverrides = map[string]encoding.Encoding{
"": unicode.UTF8,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
func LookupEncoding(enc string) (encoding.Encoding, error) {
if e, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return e, nil
}
Expand All @@ -99,7 +93,7 @@ func lookupEncoding(enc string) (encoding.Encoding, error) {
}

func IsNop(enc string) bool {
e, err := lookupEncoding(enc)
e, err := LookupEncoding(enc)
if err != nil {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/stanza/operator/helper/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ func NewSplitterConfig() SplitterConfig {

// Build builds Splitter struct
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) {
enc, err := c.EncodingConfig.Build()
enc, err := LookupEncoding(c.EncodingConfig.Encoding)
if err != nil {
return nil, err
}

flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}

return &Splitter{
Encoding: enc,
Encoding: NewEncoding(enc),
Flusher: flusher,
SplitFunc: splitFunc,
}, nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jpillora/backoff"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -108,13 +109,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
return nil, fmt.Errorf("failed to resolve listen_address: %w", err)
}

encoding, err := c.Encoding.Build()
enc, err := helper.LookupEncoding(c.Encoding.Encoding)
if err != nil {
return nil, err
}

// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize))
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize))
if err != nil {
return nil, err
}
Expand All @@ -129,7 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
encoding: enc,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
Expand Down Expand Up @@ -160,7 +161,7 @@ type Input struct {
tls *tls.Config
backoff backoff.Backoff

encoding helper.Encoding
encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}
Expand Down Expand Up @@ -253,11 +254,11 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
buf := make([]byte, 0, t.MaxLogSize)
scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.MaxLogSize)

scanner.Split(t.splitFunc)
decoder := helper.NewEncoding(t.encoding)

for scanner.Scan() {
decoded, err := t.encoding.Decode(scanner.Bytes())
decoded, err := decoder.Decode(scanner.Bytes())
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
continue
Expand Down
13 changes: 7 additions & 6 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -91,13 +92,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
return nil, fmt.Errorf("failed to resolve listen_address: %w", err)
}

encoding, err := c.Encoding.Build()
enc, err := helper.LookupEncoding(c.Encoding.Encoding)
if err != nil {
return nil, err
}

// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize)
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize)
if err != nil {
return nil, err
}
Expand All @@ -112,7 +113,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
encoding: enc,
splitFunc: splitFunc,
resolver: resolver,
}
Expand All @@ -130,7 +131,7 @@ type Input struct {
cancel context.CancelFunc
wg sync.WaitGroup

encoding helper.Encoding
encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}
Expand Down Expand Up @@ -158,6 +159,7 @@ func (u *Input) goHandleMessages(ctx context.Context) {
defer u.wg.Done()

buf := make([]byte, 0, MaxUDPSize)
decoder := helper.NewEncoding(u.encoding)
for {
message, remoteAddr, err := u.readMessage()
if err != nil {
Expand All @@ -172,11 +174,10 @@ func (u *Input) goHandleMessages(ctx context.Context) {

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
decoded, err := decoder.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
Expand Down

0 comments on commit b8f0f42

Please sign in to comment.