Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelines] Change test to not reuse same processor twice in one pipeline #6540

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/processor-duplication-per-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disallow duplicate references to processors within a single pipeline

# One or more tracking issues or pull requests related to the change
issues: [6540]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ When the Collector loads this config the result will look like this:

Note that each `batch` processor is an independent instance, although both are configured the same way, i.e. each have a `send_batch_size` of 10000.

The same name of the processor MUST NOT be referenced multiple times in the `processors` key of a single pipeline.

## <a name="opentelemetry-agent"></a>Running as an Agent

On a typical VM/container, there are user applications running in some
Expand Down
7 changes: 7 additions & 0 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,18 @@ func (cfg *Config) validateService() error {
}

// Validate pipeline processor name references.
procSet := make(map[component.ID]bool, len(cfg.Processors))
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
// Ensure no processors are duplicated within the pipeline
if _, exists := procSet[ref]; exists {

return fmt.Errorf("pipeline %q references processor %q multiple times", pipelineID, ref)
}
procSet[ref] = true
}

// Validate pipeline has at least one exporter.
Expand Down
10 changes: 10 additions & 0 deletions service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ func TestConfigValidate(t *testing.T) {
},
expected: errors.New(`pipeline "traces" references processor "nop/2" which does not exist`),
},
{
name: "duplicate-processor-reference",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Processors = append(pipe.Processors, pipe.Processors...)
return cfg
},
expected: errors.New(`pipeline "traces" references processor "nop" multiple times`),
},
{
name: "invalid-exporter-reference",
cfgFn: func() *Config {
Expand Down
2 changes: 1 addition & 1 deletion service/internal/pipelines/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBuild(t *testing.T) {
{
name: "pipelines_simple_multi_proc.yaml",
receiverIDs: []component.ID{component.NewID("examplereceiver")},
processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewID("exampleprocessor")},
processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewIDWithName("exampleprocessor", "1")},
exporterIDs: []component.ID{component.NewID("exampleexporter")},
expectedRequests: 1,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ receivers:

processors:
exampleprocessor:
exampleprocessor/1:

exporters:
exampleexporter:
Expand All @@ -11,15 +12,15 @@ service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]

metrics:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]

logs:
receivers: [examplereceiver]
processors: [exampleprocessor, exampleprocessor]
processors: [exampleprocessor, exampleprocessor/1]
exporters: [exampleexporter]