From 7e8cc96d882667e424be08c33733a0d969a2a4ee Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Nov 2021 11:48:54 -0700 Subject: [PATCH] Built pipeline ensure correct Capabilities (#4361) Users of the Pipeline should not know about internal details. Signed-off-by: Bogdan Drutu --- service/internal/builder/pipelines_builder.go | 40 ++++++++++++ service/internal/builder/receivers_builder.go | 63 ------------------- 2 files changed, 40 insertions(+), 63 deletions(-) diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go index 4867417a57c..6a380e2682a 100644 --- a/service/internal/builder/pipelines_builder.go +++ b/service/internal/builder/pipelines_builder.go @@ -221,6 +221,19 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config zap.String(components.ZapNameKey, pipelineID.String())) pipelineLogger.Info("Pipeline was built.") + // Some consumers may not correctly implement the Capabilities, + // and ignore the next consumer when calculated the Capabilities. + // Because of this wrap the first consumer if any consumers in the pipeline + // mutate the data and the first says that it doesn't. + if tc != nil { + tc = capabilitiesTraces{Traces: tc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + } + if mc != nil { + mc = capabilitiesMetrics{Metrics: mc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + } + if lc != nil { + lc = capabilitiesLogs{Logs: lc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + } bp := &builtPipeline{ logger: pipelineLogger, firstTC: tc, @@ -280,3 +293,30 @@ func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []confi // Create a junction point that fans out to all exporters. return fanoutconsumer.NewLogs(exporters) } + +type capabilitiesLogs struct { + consumer.Logs + capabilities consumer.Capabilities +} + +func (mts capabilitiesLogs) Capabilities() consumer.Capabilities { + return mts.capabilities +} + +type capabilitiesMetrics struct { + consumer.Metrics + capabilities consumer.Capabilities +} + +func (mts capabilitiesMetrics) Capabilities() consumer.Capabilities { + return mts.capabilities +} + +type capabilitiesTraces struct { + consumer.Traces + capabilities consumer.Capabilities +} + +func (mts capabilitiesTraces) Capabilities() consumer.Capabilities { + return mts.capabilities +} diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go index 171f5aa914f..0ff9cdc1736 100644 --- a/service/internal/builder/receivers_builder.go +++ b/service/internal/builder/receivers_builder.go @@ -265,91 +265,28 @@ func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.Rec } func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces { - // Optimize for the case when there is only one processor, no need to create junction point. - if len(pipelines) == 1 { - return pipelines[0].firstTC - } - var pipelineConsumers []consumer.Traces for _, pipeline := range pipelines { - // Some consumers may not correctly implement the Capabilities, - // and ignore the next consumer when calculated the Capabilities. - // Because of this wrap the first consumer if any consumers in the pipeline - // mutate the data and the first says that it doesn't. - if pipeline.MutatesData && !pipeline.firstTC.Capabilities().MutatesData { - pipeline.firstTC = mutatingTraces{Traces: pipeline.firstTC} - } pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) } - // Create a junction point that fans out to all pipelines. return fanoutconsumer.NewTraces(pipelineConsumers) } func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics { - // Optimize for the case when there is only one processor, no need to create junction point. - if len(pipelines) == 1 { - return pipelines[0].firstMC - } - var pipelineConsumers []consumer.Metrics for _, pipeline := range pipelines { - // Some consumers may not correctly implement the Capabilities, - // and ignore the next consumer when calculated the Capabilities. - // Because of this wrap the first consumer if any consumers in the pipeline - // mutate the data and the first says that it doesn't. - if pipeline.MutatesData && !pipeline.firstMC.Capabilities().MutatesData { - pipeline.firstMC = mutatingMetrics{Metrics: pipeline.firstMC} - } pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) } - // Create a junction point that fans out to all pipelines. return fanoutconsumer.NewMetrics(pipelineConsumers) } func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs { - // Optimize for the case when there is only one processor, no need to create junction point. - if len(pipelines) == 1 { - return pipelines[0].firstLC - } - var pipelineConsumers []consumer.Logs for _, pipeline := range pipelines { - // Some consumers may not correctly implement the Capabilities, - // and ignore the next consumer when calculated the Capabilities. - // Because of this wrap the first consumer if any consumers in the pipeline - // mutate the data and the first says that it doesn't. - if pipeline.MutatesData && !pipeline.firstLC.Capabilities().MutatesData { - pipeline.firstLC = mutatingLogs{Logs: pipeline.firstLC} - } pipelineConsumers = append(pipelineConsumers, pipeline.firstLC) } - // Create a junction point that fans out to all pipelines. return fanoutconsumer.NewLogs(pipelineConsumers) } - -type mutatingLogs struct { - consumer.Logs -} - -func (mts mutatingLogs) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -type mutatingMetrics struct { - consumer.Metrics -} - -func (mts mutatingMetrics) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -type mutatingTraces struct { - consumer.Traces -} - -func (mts mutatingTraces) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -}