-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
transformer.go
141 lines (115 loc) · 4.34 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
import (
"context"
"fmt"
"github.com/expr-lang/expr/vm"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors"
)
// NewTransformerConfig creates a new transformer config with default values
func NewTransformerConfig(operatorID, operatorType string) TransformerConfig {
return TransformerConfig{
WriterConfig: NewWriterConfig(operatorID, operatorType),
OnError: SendOnError,
}
}
// TransformerConfig provides a basic implementation of a transformer config.
type TransformerConfig struct {
WriterConfig `mapstructure:",squash"`
OnError string `mapstructure:"on_error"`
IfExpr string `mapstructure:"if"`
}
// Build will build a transformer operator.
func (c TransformerConfig) Build(set component.TelemetrySettings) (TransformerOperator, error) {
writerOperator, err := c.WriterConfig.Build(set)
if err != nil {
return TransformerOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
switch c.OnError {
case SendOnError, SendOnErrorQuiet, DropOnError, DropOnErrorQuiet:
default:
return TransformerOperator{}, errors.NewError(
"operator config has an invalid `on_error` field.",
"ensure that the `on_error` field is set to one of `send`, `send_quiet`, `drop`, `drop_quiet`.",
"on_error", c.OnError,
)
}
transformerOperator := TransformerOperator{
WriterOperator: writerOperator,
OnError: c.OnError,
}
if c.IfExpr != "" {
compiled, err := ExprCompileBool(c.IfExpr)
if err != nil {
return TransformerOperator{}, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err)
}
transformerOperator.IfExpr = compiled
}
return transformerOperator, nil
}
// TransformerOperator provides a basic implementation of a transformer operator.
type TransformerOperator struct {
WriterOperator
OnError string
IfExpr *vm.Program
}
// CanProcess will always return true for a transformer operator.
func (t *TransformerOperator) CanProcess() bool {
return true
}
// ProcessWith will process an entry with a transform function.
func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) error {
// Short circuit if the "if" condition does not match
skip, err := t.Skip(ctx, entry)
if err != nil {
return t.HandleEntryError(ctx, entry, err)
}
if skip {
return t.Write(ctx, entry)
}
if err := transform(entry); err != nil {
return t.HandleEntryError(ctx, entry, err)
}
return t.Write(ctx, entry)
}
// HandleEntryError will handle an entry error using the on_error strategy.
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error {
if t.OnError == SendOnErrorQuiet || t.OnError == DropOnErrorQuiet {
t.Logger().Debug("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError))
} else {
t.Logger().Error("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError))
}
if t.OnError == SendOnError || t.OnError == SendOnErrorQuiet {
writeErr := t.Write(ctx, entry)
if writeErr != nil {
err = fmt.Errorf("failed to send entry after error: %w", writeErr)
}
}
return err
}
func (t *TransformerOperator) Skip(_ context.Context, entry *entry.Entry) (bool, error) {
if t.IfExpr == nil {
return false, nil
}
env := GetExprEnv(entry)
defer PutExprEnv(env)
matches, err := vm.Run(t.IfExpr, env)
if err != nil {
return false, fmt.Errorf("running if expr: %w", err)
}
return !matches.(bool), nil
}
// TransformFunction is function that transforms an entry.
type TransformFunction = func(*entry.Entry) error
// SendOnError specifies an on_error mode for sending entries after an error.
const SendOnError = "send"
// SendOnErrorQuiet specifies an on_error mode for sending entries after an error but without logging on error level
const SendOnErrorQuiet = "send_quiet"
// DropOnError specifies an on_error mode for dropping entries after an error.
const DropOnError = "drop"
// DropOnError specifies an on_error mode for dropping entries after an error but without logging on error level
const DropOnErrorQuiet = "drop_quiet"