Skip to content

Commit

Permalink
discovery receiver: provide zap fields to regexp and expr (#3004)
Browse files Browse the repository at this point in the history
* discovery receiver: provide zap fields to regexp and expr

* use evaluator constructor and silence tests
  • Loading branch information
rmfitzpatrick authored Apr 26, 2023
1 parent 847d5bf commit 111bbd5
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 106 deletions.
80 changes: 55 additions & 25 deletions internal/receiver/discoveryreceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Discovery Receiver

| Status | |
| ------------------------ | ---------------- |
|--------------------------|------------------|
| Stability | [in-development] |
| Supported pipeline types | logs |
| Distributions | [Splunk] |
Expand Down Expand Up @@ -290,45 +290,75 @@ Flags: 0

### Main

| Name | Type | Default | Docs |
| ---- | ---- | ------- | ---- |
| `watch_observers` (required) | []string | <no value> | The array of Observer extensions to receive Endpoint events from |
| `log_endpoints` | bool | false | Whether to emit log records for Observer Endpoint events |
| `embed_receiver_config` | bool | false | Whether to embed a base64-encoded, minimal Receiver Creator config for the generated receiver as a reported metrics `discovery.receiver.rule` resource attribute value for status log record matches |
| `receivers` | map[string]ReceiverConfig | <no value> | The mapping of receiver names to their Receiver sub-config |
| Name | Type | Default | Docs |
|------------------------------|---------------------------|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `watch_observers` (required) | []string | <no value> | The array of Observer extensions to receive Endpoint events from |
| `log_endpoints` | bool | false | Whether to emit log records for Observer Endpoint events |
| `embed_receiver_config` | bool | false | Whether to embed a base64-encoded, minimal Receiver Creator config for the generated receiver as a reported metrics `discovery.receiver.rule` resource attribute value for status log record matches |
| `receivers` | map[string]ReceiverConfig | <no value> | The mapping of receiver names to their Receiver sub-config |

### ReceiverConfig

| Name | Type | Default | Docs |
| ---- | ---- | ------- | ---- |
| `rule` (required) | string | <no value> | The Receiver Creator compatible discover rule |
| `config` | map[string]any | <no value> | The receiver instance configuration, including any Receiver Creator endpoint env value expr program value expansion |
| Name | Type | Default | Docs |
|-----------------------|-------------------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| `rule` (required) | string | <no value> | The Receiver Creator compatible discover rule |
| `config` | map[string]any | <no value> | The receiver instance configuration, including any Receiver Creator endpoint env value expr program value expansion |
| `resource_attributes` | map[string]string | <no value> | A mapping of string resource attributes and their (expr program compatible) values to include in reported metrics for status log record matches |
| `status` | map[string]Match | <no value> | A mapping of `metrics` and/or `statements` to Match items for status evaluation |
| `status` | map[string]Match | <no value> | A mapping of `metrics` and/or `statements` to Match items for status evaluation |

### Match

**One of `regexp`, `strict`, or `expr` is required.**

| Name | Type | Default | Docs |
| ---- | ---- | ------- | ---- |
| `regexp` | string | <no value> | The regexp pattern to evaluate reported received metric names or component log statements |
| `strict` | string | <no value> | The string literal to compare equivalence against reported received metric names or component log statements |
| `expr` | string | <no value> | The expr program run with the reported received metric names or component log statements (vm env TBD) |
| `first_only` | bool | false | Whether to emit only one log record for the first matching metric or log statement, ignoring all subsequent matches |
| `record` | LogRecord | <no value> | The emitted log record content |
| Name | Type | Default | Docs |
|--------------|-----------|------------|---------------------------------------------------------------------------------------------------------------------|
| `strict` | string | <no value> | The string literal to compare equivalence against reported received metric names or component log statement message |
| `regexp` | string | <no value> | The regexp pattern to evaluate reported received metric names or component log statements |
| `expr` | string | <no value> | The expr program run with the reported received metric names or component log statements |
| `first_only` | bool | false | Whether to emit only one log record for the first matching metric or log statement, ignoring all subsequent matches |
| `record` | LogRecord | <no value> | The emitted log record content |

#### `strict`

For metrics, the metric name must match exactly.
For logged statements, the message (`zapLogger.Info("<this statement message>")`) must match exactly.

#### `regexp`

For metrics, the regexp is evaluated against the metric name.
For logged statements, the regexp is evaluated against the message and fields (`zapLogger.Info("<logged statement message>", zap.Any("field_name", "field_value"))`) rendered as a yaml mapping. The fields for `caller`, `name`, and `stacktrace` are currently withheld from the mapping.

#### `expr`

See [https://expr.medv.io/](https://expr.medv.io/) for env and language documentation.

For metrics, the expr env consists of `{ "name": "<metric name>" }`.
For logs, the expr env consists of `{ "message": "<logged statement message>", "<field_name>": "<field_value>" }`. The fields `caller`, `name`, and `stacktrace` are currently withheld from the env.

Since some fields may not be valid expr identifiers (containing non word characters), the env contains a self-referential `ExprEnv` object:

```go
logger.Warn("some message", zap.String("some.field.with.periods", "some.value"))
```

In this case `some.field.with.periods` can be referenced via:

```yaml
expr: 'ExprEnv["some.field.with.periods"] contains "value"'
```

### LogRecord

| Name | Type | Default | Docs |
| ---- | ---- | ------- | ---- |
| `severity_text` | string | Emitted log statement severity level, if any, or "info" | The emitted log record's severity text |
| `body` | string | Emitted log statement message | The emitted log record's body |
| `attributes` | map[string]string | Emitted log statements fields | The emitted log record's attributes |
| Name | Type | Default | Docs |
|-----------------|-------------------|---------------------------------------------------------|----------------------------------------|
| `severity_text` | string | Emitted log statement severity level, if any, or "info" | The emitted log record's severity text |
| `body` | string | Emitted log statement message | The emitted log record's body |
| `attributes` | map[string]string | Emitted log statements fields | The emitted log record's attributes |

## Status log record content

In addition to the effects of the configured values, each emitted log record will include:

* `event.type` resource attribute with either `metric.match` or `statement.match` based on context.
* `discovery.status` log record attribute with `successful`, `partial`, or `failed` status depending on match.
* `discovery.status` log record attribute with `successful`, `partial`, or `failed` status depending on match.

24 changes: 20 additions & 4 deletions internal/receiver/discoveryreceiver/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

// exprEnvFunc is to create an expr.Env function from pattern content.
type exprEnvFunc func(pattern string) map[string]any

// evaluator is the base status matcher that determines if telemetry warrants emitting a matching log record.
// It also provides embedded config correlation that its embedding structs will utilize.
type evaluator struct {
Expand All @@ -40,8 +43,17 @@ type evaluator struct {
// if match.FirstOnly this ~sync.Map(map[string]struct{}) keeps track of
// whether we've already emitted a record for the statement and can skip processing.
alreadyLogged *sync.Map
exprEnv func(pattern string) map[string]any
id component.ID
exprEnv exprEnvFunc
}

func newEvaluator(logger *zap.Logger, config *Config, correlations correlationStore, envFunc exprEnvFunc) *evaluator {
return &evaluator{
logger: logger,
config: config,
correlations: correlations,
alreadyLogged: &sync.Map{},
exprEnv: envFunc,
}
}

// evaluateMatch parses the provided Match and returns whether it warrants a status log record
Expand All @@ -67,12 +79,15 @@ func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery.
case match.Expr != "":
matchPattern = match.Expr
var program *vm.Program
// we need a way to look up fields that aren't valid identifiers https://github.com/antonmedv/expr/issues/106
env := e.exprEnv(pattern)
env["ExprEnv"] = env
// TODO: cache compiled programs for performance benefit
if program, err = expr.Compile(match.Expr, expr.Env(e.exprEnv(pattern))); err != nil {
if program, err = expr.Compile(match.Expr, expr.Env(env)); err != nil {
err = fmt.Errorf("invalid match expr statement: %w", err)
} else {
matchFunc = func(p string) (bool, error) {
ret, runErr := vm.Run(program, e.exprEnv(p))
ret, runErr := vm.Run(program, env)
if runErr != nil {
return false, runErr
}
Expand All @@ -99,6 +114,7 @@ func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery.
}
}

e.logger.Debug(fmt.Sprintf("evaluated match %v against %q (should log: %v)", matchPattern, pattern, shouldLog))
return shouldLog, nil
}

Expand Down
17 changes: 10 additions & 7 deletions internal/receiver/discoveryreceiver/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap/zaptest"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func setup(t testing.TB) (*evaluator, component.ID, observer.EndpointID) {
logger := zaptest.NewLogger(t)
func setup() (*evaluator, component.ID, observer.EndpointID) {
// If debugging tests, replace the Nop Logger with a test instance to see
// all statements. Not in regular use to avoid spamming output.
// logger := zaptest.NewLogger(t)
logger := zap.NewNop()
alreadyLogged := &sync.Map{}
eval := &evaluator{
logger: logger,
Expand All @@ -49,7 +52,7 @@ func setup(t testing.TB) (*evaluator, component.ID, observer.EndpointID) {
}

func TestEvaluateMatch(t *testing.T) {
eval, receiverID, endpointID := setup(t)
eval, receiverID, endpointID := setup()
anotherReceiverID := component.NewIDWithName("type", "another.name")

for _, tc := range []struct {
Expand Down Expand Up @@ -87,7 +90,7 @@ func TestEvaluateMatch(t *testing.T) {
}

func TestEvaluateInvalidMatch(t *testing.T) {
eval, receiverID, endpointID := setup(t)
eval, receiverID, endpointID := setup()

for _, tc := range []struct {
typ string
Expand All @@ -109,7 +112,7 @@ func TestEvaluateInvalidMatch(t *testing.T) {
func TestCorrelateResourceAttrs(t *testing.T) {
for _, embed := range []bool{false, true} {
t.Run(fmt.Sprintf("embed-%v", embed), func(t *testing.T) {
eval, _, endpointID := setup(t)
eval, _, endpointID := setup()
eval.config.EmbedReceiverConfig = embed

endpoint := observer.Endpoint{ID: endpointID}
Expand Down Expand Up @@ -158,7 +161,7 @@ func TestCorrelateResourceAttrs(t *testing.T) {
func TestCorrelateResourceAttrsWithExistingConfig(t *testing.T) {
for _, embed := range []bool{false, true} {
t.Run(fmt.Sprintf("embed-%v", embed), func(t *testing.T) {
eval, _, endpointID := setup(t)
eval, _, endpointID := setup()
eval.config.EmbedReceiverConfig = embed

endpoint := observer.Endpoint{ID: endpointID}
Expand Down
15 changes: 4 additions & 11 deletions internal/receiver/discoveryreceiver/metric_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package discoveryreceiver
import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -50,20 +48,15 @@ type metricEvaluator struct {
pLogs chan plog.Logs
}

func newMetricEvaluator(logger *zap.Logger, id component.ID, cfg *Config, pLogs chan plog.Logs, correlations correlationStore) *metricEvaluator {
func newMetricEvaluator(logger *zap.Logger, cfg *Config, pLogs chan plog.Logs, correlations correlationStore) *metricEvaluator {
return &metricEvaluator{
pLogs: pLogs,
evaluator: &evaluator{
logger: logger,
config: cfg,
correlations: correlations,
alreadyLogged: &sync.Map{},
evaluator: newEvaluator(logger, cfg, correlations,
// TODO: provide more capable env w/ resource and metric attributes
exprEnv: func(pattern string) map[string]any {
func(pattern string) map[string]any {
return map[string]any{"name": pattern}
},
id: id,
},
),
}
}

Expand Down
15 changes: 9 additions & 6 deletions internal/receiver/discoveryreceiver/metric_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,29 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap/zaptest"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestMetricEvaluatorBaseMetricConsumer(t *testing.T) {
logger := zaptest.NewLogger(t)
logger := zap.NewNop()
cfg := &Config{}
plogs := make(chan plog.Logs)
cStore := newCorrelationStore(logger, time.Hour)

me := newMetricEvaluator(logger, component.NewID("some.type"), cfg, plogs, cStore)
me := newMetricEvaluator(logger, cfg, plogs, cStore)
require.Equal(t, consumer.Capabilities{}, me.Capabilities())

md := pmetric.NewMetrics()
require.NoError(t, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricEvaluation(t *testing.T) {
logger := zaptest.NewLogger(t)
// If debugging tests, replace the Nop Logger with a test instance to see
// all statements. Not in regular use to avoid spamming output.
// logger := zaptest.NewLogger(t)
logger := zap.NewNop()
for _, tc := range []struct {
name string
match Match
Expand Down Expand Up @@ -87,7 +90,7 @@ func TestMetricEvaluation(t *testing.T) {
addedState, observerID,
)

me := newMetricEvaluator(logger, component.NewID("some.type"), cfg, plogs, cStore)
me := newMetricEvaluator(logger, cfg, plogs, cStore)

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -204,7 +207,7 @@ func TestTimestampFromMetric(t *testing.T) {
{name: "MetricTypeNone", metricFunc: func(md pmetric.Metric) bool { return true }},
} {
t.Run(test.name, func(t *testing.T) {
me := newMetricEvaluator(zaptest.NewLogger(t), component.NewID("some.type"), &Config{}, make(chan plog.Logs), nil)
me := newMetricEvaluator(zap.NewNop(), &Config{}, make(chan plog.Logs), nil)
md := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
shouldBeNil := test.metricFunc(md)
actual := me.timestampFromMetric(md)
Expand Down
2 changes: 1 addition & 1 deletion internal/receiver/discoveryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (d *discoveryReceiver) Start(ctx context.Context, host component.Host) (err
d.endpointTracker = newEndpointTracker(d.observables, d.config, d.logger, d.pLogs, correlations)
d.endpointTracker.start()

d.metricEvaluator = newMetricEvaluator(d.logger, d.settings.ID, d.config, d.pLogs, correlations)
d.metricEvaluator = newMetricEvaluator(d.logger, d.config, d.pLogs, correlations)

if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, d.pLogs, correlations); err != nil {
return fmt.Errorf("failed creating statement evaluator: %w", err)
Expand Down
Loading

0 comments on commit 111bbd5

Please sign in to comment.