-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[routingconnector] convert routing processor to a connector (#21498)
**Description:** This PR introduces a connector version of the routing processor. It currently supports routing based on resource attributes OTTL statements only. Context based routing will be added later in a followup PR. There are two issues regarding fanout consumers that are being addressed in the core repo. * The routing connector needs to be a consumer in multiple pipelines (the routing processor can route to a single exporter). * Will be resolved by: open-telemetry/opentelemetry-collector#7840 * We need the ability to create connector routers to facilitate testing. I had to temporarily copy the fanoutconsumer package into the routing connector codebase due to package visibility issues. * Will be resolved by: open-telemetry/opentelemetry-collector#7673 We can address these issues as the relevant PRs land on the core repo. cc: @djaglowski, @jpkrohling, @kovrus **Link to tracking Issue:** #19738 **Testing:** Unit / manual **Documentation:** The readme has been ported from the routing processor --------- Co-authored-by: Daniel Jaglowski <[email protected]> Co-authored-by: Ruslan Kovalov <[email protected]>
- Loading branch information
1 parent
afb5b9b
commit d7615dd
Showing
32 changed files
with
3,916 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: new_component | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: routingconnector | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: A connector version of the routingprocessor | ||
|
||
# One or more tracking issues related to the change | ||
issues: [19738] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: |
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# Routing Connector | ||
|
||
<!-- status autogenerated section --> | ||
| Status | | | ||
| ------------- |-----------| | ||
| Distributions | [contrib] | | ||
|
||
[development]: https://github.com/open-telemetry/opentelemetry-collector#development | ||
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib | ||
|
||
## Supported Pipeline Types | ||
|
||
| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] | | ||
| ------------------------ | ------------------------ | ----------------- | | ||
| traces | traces | [development] | | ||
| metrics | metrics | [development] | | ||
| logs | logs | [development] | | ||
|
||
[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type | ||
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type | ||
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels | ||
<!-- end autogenerated section --> | ||
|
||
Routes logs, metrics or traces based on resource attributes to specific pipelines using [OpenTelemetry Transformation Language (OTTL)](../../pkg/ottl/README.md) statements as routing conditions. | ||
|
||
## Configuration | ||
|
||
If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README]. | ||
|
||
The following settings are available: | ||
|
||
- `table (required)`: the routing table for this connector. | ||
- `table.statement (required)`: the routing condition provided as the [OTTL] statement. | ||
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met. | ||
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions. | ||
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `ignore` and `propagate`. If `ignored` is used and a statement's condition has an error then the payload will be routed to the default pipelines. If not supplied, `propagate` is used. | ||
|
||
Example: | ||
|
||
```yaml | ||
receivers: | ||
otlp: | ||
|
||
exporters: | ||
jaeger: | ||
endpoint: localhost:14250 | ||
jaeger/acme: | ||
endpoint: localhost:24250 | ||
jaeger/ecorp: | ||
endpoint: localhost:34250 | ||
|
||
connectors: | ||
routing: | ||
default_pipelines: [traces/jaeger] | ||
error_mode: ignore | ||
table: | ||
- statement: route() where attributes["X-Tenant"] == "acme" | ||
pipelines: [traces/jaeger-acme] | ||
- statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp") | ||
pipelines: [traces/jaeger-ecorp] | ||
|
||
service: | ||
pipelines: | ||
traces/in: | ||
receivers: [oltp] | ||
exporters: [routing] | ||
traces/jaeger: | ||
receivers: [routing] | ||
exporters: [jaeger] | ||
traces/jaeger-acme: | ||
receivers: [routing] | ||
exporters: [jaeger/acme] | ||
traces/jaeger-ecorp: | ||
receivers: [routing] | ||
exporters: [jaeger/ecorp] | ||
``` | ||
A signal may get matched by routing conditions of more than one routing table entry. In this case, the signal will be routed to all pipelines of matching routes. | ||
Respectively, if none of the routing conditions met, then a signal is routed to default pipelines. | ||
## Differences between the Routing Connector and Routing Processor | ||
- The connector will only route using [OTTL] statements which can only be applied to resource attributes. It does not support matching on context values at this time. | ||
- The connector routes to pipelines, not exporters as the processor does. | ||
### OTTL Limitations | ||
- Currently, it is not possible to specify boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information. | ||
- Supported [OTTL] functions: | ||
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch) | ||
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key) | ||
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys) | ||
|
||
## Additional Settings | ||
The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files: | ||
|
||
- [logs](./testdata/config_logs.yaml) | ||
- [metrics](./testdata/config_metrics.yaml) | ||
- [traces](./testdata/config_traces.yaml) | ||
|
||
[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development | ||
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md | ||
[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type | ||
[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type | ||
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib | ||
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package routingconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector" | ||
|
||
import ( | ||
"errors" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" | ||
) | ||
|
||
var ( | ||
errEmptyRoute = errors.New("invalid route: no statement provided") | ||
errNoPipelines = errors.New("invalid route: no pipelines defined") | ||
errUnexpectedConsumer = errors.New("expected consumer to be a connector router") | ||
errNoTableItems = errors.New("invalid routing table: the routing table is empty") | ||
) | ||
|
||
// Config defines configuration for the Routing processor. | ||
type Config struct { | ||
// DefaultPipelines contains the list of pipelines to use when a more specific record can't be | ||
// found in the routing table. | ||
// Optional. | ||
DefaultPipelines []component.ID `mapstructure:"default_pipelines"` | ||
|
||
// ErrorMode determines how the processor reacts to errors that occur while processing an OTTL | ||
// condition. | ||
// Valid values are `ignore` and `propagate`. | ||
// `ignore` means the processor ignores errors returned by conditions and continues on to the | ||
// next condition. This is the recommended mode. If `ignored` is used and a statement's | ||
// condition has an error then the payload will be routed to the default exporter. `propagate` | ||
// means the processor returns the error up the pipeline. This will result in the payload being | ||
// dropped from the collector. | ||
// The default value is `propagate`. | ||
ErrorMode ottl.ErrorMode `mapstructure:"error_mode"` | ||
|
||
// Table contains the routing table for this processor. | ||
// Required. | ||
Table []RoutingTableItem `mapstructure:"table"` | ||
} | ||
|
||
// Validate checks if the processor configuration is valid. | ||
func (c *Config) Validate() error { | ||
// validate that there's at least one item in the table | ||
if len(c.Table) == 0 { | ||
return errNoTableItems | ||
} | ||
|
||
// validate that every route has a value for the routing attribute and has | ||
// at least one pipeline | ||
for _, item := range c.Table { | ||
if len(item.Statement) == 0 { | ||
return errEmptyRoute | ||
} | ||
|
||
if len(item.Pipelines) == 0 { | ||
return errNoPipelines | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// RoutingTableItem specifies how data should be routed to the different pipelines | ||
type RoutingTableItem struct { | ||
// Statement is a OTTL statement used for making a routing decision. | ||
// Required when 'Value' isn't provided. | ||
Statement string `mapstructure:"statement"` | ||
|
||
// Pipelines contains the list of pipelines to use when the value from the FromAttribute field | ||
// matches this table item. When no pipelines are specified, the ones specified under | ||
// DefaultPipelines are used, if any. | ||
// The routing processor will fail upon the first failure from these pipelines. | ||
// Optional. | ||
Pipelines []component.ID `mapstructure:"pipelines"` | ||
} |
Oops, something went wrong.