Skip to content

Commit

Permalink
Built pipeline ensure correct Capabilities (#4361)
Browse files Browse the repository at this point in the history
Users of the Pipeline should not know about internal details.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Nov 4, 2021
1 parent 81a6531 commit 7e8cc96
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 63 deletions.
40 changes: 40 additions & 0 deletions service/internal/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config
zap.String(components.ZapNameKey, pipelineID.String()))
pipelineLogger.Info("Pipeline was built.")

// Some consumers may not correctly implement the Capabilities,
// and ignore the next consumer when calculated the Capabilities.
// Because of this wrap the first consumer if any consumers in the pipeline
// mutate the data and the first says that it doesn't.
if tc != nil {
tc = capabilitiesTraces{Traces: tc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
}
if mc != nil {
mc = capabilitiesMetrics{Metrics: mc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
}
if lc != nil {
lc = capabilitiesLogs{Logs: lc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
}
bp := &builtPipeline{
logger: pipelineLogger,
firstTC: tc,
Expand Down Expand Up @@ -280,3 +293,30 @@ func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []confi
// Create a junction point that fans out to all exporters.
return fanoutconsumer.NewLogs(exporters)
}

type capabilitiesLogs struct {
consumer.Logs
capabilities consumer.Capabilities
}

func (mts capabilitiesLogs) Capabilities() consumer.Capabilities {
return mts.capabilities
}

type capabilitiesMetrics struct {
consumer.Metrics
capabilities consumer.Capabilities
}

func (mts capabilitiesMetrics) Capabilities() consumer.Capabilities {
return mts.capabilities
}

type capabilitiesTraces struct {
consumer.Traces
capabilities consumer.Capabilities
}

func (mts capabilitiesTraces) Capabilities() consumer.Capabilities {
return mts.capabilities
}
63 changes: 0 additions & 63 deletions service/internal/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,91 +265,28 @@ func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.Rec
}

func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces {
// Optimize for the case when there is only one processor, no need to create junction point.
if len(pipelines) == 1 {
return pipelines[0].firstTC
}

var pipelineConsumers []consumer.Traces
for _, pipeline := range pipelines {
// Some consumers may not correctly implement the Capabilities,
// and ignore the next consumer when calculated the Capabilities.
// Because of this wrap the first consumer if any consumers in the pipeline
// mutate the data and the first says that it doesn't.
if pipeline.MutatesData && !pipeline.firstTC.Capabilities().MutatesData {
pipeline.firstTC = mutatingTraces{Traces: pipeline.firstTC}
}
pipelineConsumers = append(pipelineConsumers, pipeline.firstTC)
}

// Create a junction point that fans out to all pipelines.
return fanoutconsumer.NewTraces(pipelineConsumers)
}

func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics {
// Optimize for the case when there is only one processor, no need to create junction point.
if len(pipelines) == 1 {
return pipelines[0].firstMC
}

var pipelineConsumers []consumer.Metrics
for _, pipeline := range pipelines {
// Some consumers may not correctly implement the Capabilities,
// and ignore the next consumer when calculated the Capabilities.
// Because of this wrap the first consumer if any consumers in the pipeline
// mutate the data and the first says that it doesn't.
if pipeline.MutatesData && !pipeline.firstMC.Capabilities().MutatesData {
pipeline.firstMC = mutatingMetrics{Metrics: pipeline.firstMC}
}
pipelineConsumers = append(pipelineConsumers, pipeline.firstMC)
}

// Create a junction point that fans out to all pipelines.
return fanoutconsumer.NewMetrics(pipelineConsumers)
}

func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs {
// Optimize for the case when there is only one processor, no need to create junction point.
if len(pipelines) == 1 {
return pipelines[0].firstLC
}

var pipelineConsumers []consumer.Logs
for _, pipeline := range pipelines {
// Some consumers may not correctly implement the Capabilities,
// and ignore the next consumer when calculated the Capabilities.
// Because of this wrap the first consumer if any consumers in the pipeline
// mutate the data and the first says that it doesn't.
if pipeline.MutatesData && !pipeline.firstLC.Capabilities().MutatesData {
pipeline.firstLC = mutatingLogs{Logs: pipeline.firstLC}
}
pipelineConsumers = append(pipelineConsumers, pipeline.firstLC)
}

// Create a junction point that fans out to all pipelines.
return fanoutconsumer.NewLogs(pipelineConsumers)
}

type mutatingLogs struct {
consumer.Logs
}

func (mts mutatingLogs) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

type mutatingMetrics struct {
consumer.Metrics
}

func (mts mutatingMetrics) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

type mutatingTraces struct {
consumer.Traces
}

func (mts mutatingTraces) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

0 comments on commit 7e8cc96

Please sign in to comment.