Skip to content

Commit

Permalink
[connector/routing] Add support for OTTL conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 10, 2024
1 parent d7e5154 commit 64b951c
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 30 deletions.
28 changes: 28 additions & 0 deletions .chloggen/routing-connector-conditions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow routing based on OTTL Conditions

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35731]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Each route must contain either a statement or a condition.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
22 changes: 10 additions & 12 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ If you are not already familiar with connectors, you may find it helpful to firs
The following settings are available:

- `table (required)`: the routing table for this connector.
- `table.statement (required)`: the routing condition provided as the [OTTL] statement.
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided.
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignore` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used.
Expand Down Expand Up @@ -97,23 +98,20 @@ Respectively, if none of the routing conditions met, then a signal is routed to
- The connector will only route using [OTTL] statements which can only be applied to resource attributes. It does not support matching on context values at this time.
- The connector routes to pipelines, not exporters as the processor does.
### OTTL Limitations
- Currently, it is not possible to specify boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information.
- Supported [OTTL] functions:
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys)
### Supported [OTTL] functions
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys)
## Additional Settings
The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files:
- [logs](./testdata/config_logs.yaml)
- [metrics](./testdata/config_metrics.yaml)
- [traces](./testdata/config_traces.yaml)
[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md
23 changes: 16 additions & 7 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

var (
errEmptyRoute = errors.New("invalid route: no statement provided")
errNoPipelines = errors.New("invalid route: no pipelines defined")
errUnexpectedConsumer = errors.New("expected consumer to be a connector router")
errNoTableItems = errors.New("invalid routing table: the routing table is empty")
errNoConditionOrStatement = errors.New("invalid route: no condition or statement provided")
errConditionAndStatement = errors.New("invalid route: both condition and statement provided")
errNoPipelines = errors.New("invalid route: no pipelines defined")
errUnexpectedConsumer = errors.New("expected consumer to be a connector router")
errNoTableItems = errors.New("invalid routing table: the routing table is empty")
)

// Config defines configuration for the Routing processor.
Expand Down Expand Up @@ -55,8 +56,12 @@ func (c *Config) Validate() error {
// validate that every route has a value for the routing attribute and has
// at least one pipeline
for _, item := range c.Table {
if len(item.Statement) == 0 {
return errEmptyRoute
if item.Statement == "" && item.Condition == "" {
return errNoConditionOrStatement
}

if item.Statement != "" && item.Condition != "" {
return errConditionAndStatement
}

if len(item.Pipelines) == 0 {
Expand All @@ -70,9 +75,13 @@ func (c *Config) Validate() error {
// RoutingTableItem specifies how data should be routed to the different pipelines
type RoutingTableItem struct {
// Statement is a OTTL statement used for making a routing decision.
// Required when 'Value' isn't provided.
// One of 'Statement' or 'Condition' must be provided.
Statement string `mapstructure:"statement"`

// Condition is an OTTL condition used for making a routing decision.
// One of 'Statement' or 'Condition' must be provided.
Condition string `mapstructure:"condition"`

// Pipelines contains the list of pipelines to use when the value from the FromAttribute field
// matches this table item. When no pipelines are specified, the ones specified under
// DefaultPipelines are used, if any.
Expand Down
49 changes: 47 additions & 2 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestValidateConfig(t *testing.T) {
},
},
},
error: "invalid route: no statement provided",
error: "invalid route: no condition or statement provided",
},
{
name: "no pipeline provided",
Expand All @@ -162,11 +162,56 @@ func TestValidateConfig(t *testing.T) {
config: &Config{},
error: "invalid routing table: the routing table is empty",
},
{
name: "condition provided",
config: &Config{
Table: []RoutingTableItem{
{
Condition: `attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
},
{
name: "statement provided",
config: &Config{
Table: []RoutingTableItem{
{
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
},
{
name: "both condition and statement provided",
config: &Config{
Table: []RoutingTableItem{
{
Condition: `attributes["attr"] == "acme"`,
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: "invalid route: both condition and statement provided",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.EqualError(t, component.ValidateConfig(tt.config), tt.error)
if tt.error == "" {
assert.NoError(t, component.ValidateConfig(tt.config))
} else {
assert.EqualError(t, component.ValidateConfig(tt.config), tt.error)
}
})
}
}
6 changes: 3 additions & 3 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestLogsRegisterConsumersForValidRoute(t *testing.T) {
Pipelines: []pipeline.ID{logs0},
},
{
Statement: `route() where attributes["X-Tenant"] == "*"`,
Condition: `attributes["X-Tenant"] == "*"`,
Pipelines: []pipeline.ID{logs0, logs1},
},
},
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
DefaultPipelines: []pipeline.ID{logsDefault},
Table: []RoutingTableItem{
{
Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`,
Condition: `IsMatch(attributes["X-Tenant"], ".*acme") == true`,
Pipelines: []pipeline.ID{logs0},
},
{
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
Pipelines: []pipeline.ID{logs1},
},
{
Statement: `route() where attributes["X-Tenant"] == "ecorp"`,
Condition: `attributes["X-Tenant"] == "ecorp"`,
Pipelines: []pipeline.ID{logsDefault, logs0},
},
},
Expand Down
6 changes: 3 additions & 3 deletions connector/routingconnector/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMetricsRegisterConsumersForValidRoute(t *testing.T) {
Pipelines: []pipeline.ID{metrics0},
},
{
Statement: `route() where attributes["X-Tenant"] == "*"`,
Condition: `attributes["X-Tenant"] == "*"`,
Pipelines: []pipeline.ID{metrics0, metrics1},
},
},
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestMetricsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
DefaultPipelines: []pipeline.ID{metricsDefault},
Table: []RoutingTableItem{
{
Statement: `route() where attributes["value"] > 2.5`,
Condition: `attributes["value"] > 2.5`,
Pipelines: []pipeline.ID{metrics0},
},
{
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMetricsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
Pipelines: []pipeline.ID{metrics1},
},
{
Statement: `route() where attributes["value"] == 1.0`,
Condition: `attributes["value"] == 1.0`,
Pipelines: []pipeline.ID{metricsDefault, metrics0},
},
},
Expand Down
3 changes: 3 additions & 0 deletions connector/routingconnector/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (r *router[C]) registerRouteConsumers() error {
// does not contain a valid OTTL statement then nil is returned.
func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) {
var statement *ottl.Statement[ottlresource.TransformContext]
if item.Condition != "" {
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
}
if item.Statement != "" {
var err error
statement, err = r.parser.ParseStatement(item.Statement)
Expand Down
6 changes: 3 additions & 3 deletions connector/routingconnector/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTracesRegisterConsumersForValidRoute(t *testing.T) {
Pipelines: []pipeline.ID{traces0},
},
{
Statement: `route() where attributes["X-Tenant"] == "*"`,
Condition: `attributes["X-Tenant"] == "*"`,
Pipelines: []pipeline.ID{traces0, traces1},
},
},
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestTracesCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
DefaultPipelines: []pipeline.ID{tracesDefault},
Table: []RoutingTableItem{
{
Statement: `route() where attributes["value"] > 0 and attributes["value"] < 4`,
Condition: `attributes["value"] > 0 and attributes["value"] < 4`,
Pipelines: []pipeline.ID{traces0},
},
{
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestTracesCorrectlyMatchOnceWithOTTL(t *testing.T) {
Pipelines: []pipeline.ID{traces1},
},
{
Statement: `route() where attributes["value"] == 5`,
Condition: `attributes["value"] == 5`,
Pipelines: []pipeline.ID{tracesDefault, traces0},
},
},
Expand Down

0 comments on commit 64b951c

Please sign in to comment.