diff --git a/docs/design.md b/docs/design.md index 0d8c39c32a57..fb1f2038d9a0 100644 --- a/docs/design.md +++ b/docs/design.md @@ -19,7 +19,7 @@ Pipelines can operate on 2 telemetry data types: traces and metrics. The data ty ![Pipelines](images/design-pipelines.png) -There can be one or more receivers in a pipeline. Data from all receivers is pushed to the first processor, which performs a processing on it and then pushes it to the next processor (or it may drop the data, e.g. if it is a “sampling” processor) and so on until the last processor in the pipeline pushes the data to the exporters. Each exporter gets a copy of each data element. The last processor uses a `fanoutprocessor` to fanout the data to multiple exporters. +There can be one or more receivers in a pipeline. Data from all receivers is pushed to the first processor, which performs a processing on it and then pushes it to the next processor (or it may drop the data, e.g. if it is a “sampling” processor) and so on until the last processor in the pipeline pushes the data to the exporters. Each exporter gets a copy of each data element. The last processor uses a `FanOutConnector` to fan out the data to multiple exporters. The pipeline is constructed during Service startup based on pipeline definition in the config file. @@ -63,7 +63,7 @@ When the Service loads this config the result will look like this (part of proce ![Receivers](images/design-receivers.png) -Important: when the same receiver is referenced in more than one pipeline the Service will create only one receiver instance at runtime that will send the data to `fanoutprocessor` which in turn will send the data to the first processor of each pipeline. The data propagation from receiver to `fanoutprocessor` and then to processors is via synchronous function call. This means that if one processor blocks the call the other pipelines that are attached to this receiver will be blocked from receiving the same data and the receiver itself will stop processing and forwarding newly received data. +Important: when the same receiver is referenced in more than one pipeline the Service will create only one receiver instance at runtime that will send the data to `FanOutConnector` which in turn will send the data to the first processor of each pipeline. The data propagation from receiver to `FanOutConnector` and then to processors is via synchronous function call. This means that if one processor blocks the call the other pipelines that are attached to this receiver will be blocked from receiving the same data and the receiver itself will stop processing and forwarding newly received data. ### Exporters diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 4152295d8aa7..2e1c60b26d48 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -33,7 +33,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" "github.com/open-telemetry/opentelemetry-service/internal/testutils" - "github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor" + "github.com/open-telemetry/opentelemetry-service/processor" "github.com/open-telemetry/opentelemetry-service/receiver/receivertest" "github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver" ) @@ -157,7 +157,7 @@ zipkin: tes[0].(*zipkinExporter).reporter = mzr // Run the Zipkin receiver to "receive spans upload from a client application" - zexp := fanoutprocessor.NewTraceProcessor(tes) + zexp := processor.NewTraceFanOutConnector(tes) zi, err := zipkinreceiver.New(":0", zexp) if err != nil { t.Fatalf("Failed to create a new Zipkin receiver: %v", err) diff --git a/processor/fanoutprocessor/fanoutprocessor.go b/processor/fanoutconnector.go similarity index 50% rename from processor/fanoutprocessor/fanoutprocessor.go rename to processor/fanoutconnector.go index 791dea44f924..184683ee0ddc 100644 --- a/processor/fanoutprocessor/fanoutprocessor.go +++ b/processor/fanoutconnector.go @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package fanoutprocessor contains implementations of Trace/Metrics processors -// that fan out to multiple other processors. -package fanoutprocessor +package processor import ( "context" @@ -22,43 +20,45 @@ import ( "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/oterr" - "github.com/open-telemetry/opentelemetry-service/processor" ) -// NewMetricsProcessor wraps multiple metrics consumers in a single one. -func NewMetricsProcessor(mcs []consumer.MetricsConsumer) processor.MetricsProcessor { - return metricsConsumers(mcs) +// This file contains implementations of Trace/Metrics connectors +// that fan out the data to multiple other consumers. + +// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one. +func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) MetricsProcessor { + return metricsFanOutConnector(mcs) } -type metricsConsumers []consumer.MetricsConsumer +type metricsFanOutConnector []consumer.MetricsConsumer -var _ processor.MetricsProcessor = (*metricsConsumers)(nil) +var _ MetricsProcessor = (*metricsFanOutConnector)(nil) // ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. -func (mcs metricsConsumers) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { var errs []error - for _, mdp := range mcs { - if err := mdp.ConsumeMetricsData(ctx, md); err != nil { + for _, mc := range mfc { + if err := mc.ConsumeMetricsData(ctx, md); err != nil { errs = append(errs, err) } } return oterr.CombineErrors(errs) } -// NewTraceProcessor wraps multiple trace consumers in a single one. -func NewTraceProcessor(tcs []consumer.TraceConsumer) processor.TraceProcessor { - return traceConsumers(tcs) +// NewTraceFanOutConnector wraps multiple trace consumers in a single one. +func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) TraceProcessor { + return traceFanOutConnector(tcs) } -type traceConsumers []consumer.TraceConsumer +type traceFanOutConnector []consumer.TraceConsumer -var _ processor.TraceProcessor = (*traceConsumers)(nil) +var _ TraceProcessor = (*traceFanOutConnector)(nil) // ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. -func (tcs traceConsumers) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { var errs []error - for _, tdp := range tcs { - if err := tdp.ConsumeTraceData(ctx, td); err != nil { + for _, tc := range tfc { + if err := tc.ConsumeTraceData(ctx, td); err != nil { errs = append(errs, err) } } diff --git a/processor/fanoutprocessor/fanoutprocessor_test.go b/processor/fanoutconnector_test.go similarity index 91% rename from processor/fanoutprocessor/fanoutprocessor_test.go rename to processor/fanoutconnector_test.go index e89281559a16..7da85c34f64a 100644 --- a/processor/fanoutprocessor/fanoutprocessor_test.go +++ b/processor/fanoutconnector_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package fanoutprocessor +package processor import ( "context" @@ -31,7 +31,7 @@ func TestTraceProcessorMultiplexing(t *testing.T) { processors[i] = &mockTraceConsumer{} } - tdp := NewTraceProcessor(processors) + tfc := NewTraceFanOutConnector(processors) td := consumerdata.TraceData{ Spans: make([]*tracepb.Span, 7), } @@ -39,7 +39,7 @@ func TestTraceProcessorMultiplexing(t *testing.T) { var wantSpansCount = 0 for i := 0; i < 2; i++ { wantSpansCount += len(td.Spans) - err := tdp.ConsumeTraceData(context.Background(), td) + err := tfc.ConsumeTraceData(context.Background(), td) if err != nil { t.Errorf("Wanted nil got error") return @@ -64,7 +64,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { // Make one processor return error processors[1].(*mockTraceConsumer).MustFail = true - tdp := NewTraceProcessor(processors) + tfc := NewTraceFanOutConnector(processors) td := consumerdata.TraceData{ Spans: make([]*tracepb.Span, 5), } @@ -72,7 +72,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { var wantSpansCount = 0 for i := 0; i < 2; i++ { wantSpansCount += len(td.Spans) - err := tdp.ConsumeTraceData(context.Background(), td) + err := tfc.ConsumeTraceData(context.Background(), td) if err == nil { t.Errorf("Wanted error got nil") return @@ -94,7 +94,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { processors[i] = &mockMetricsConsumer{} } - mdp := NewMetricsProcessor(processors) + mfc := NewMetricsFanOutConnector(processors) md := consumerdata.MetricsData{ Metrics: make([]*metricspb.Metric, 7), } @@ -102,7 +102,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { var wantMetricsCount = 0 for i := 0; i < 2; i++ { wantMetricsCount += len(md.Metrics) - err := mdp.ConsumeMetricsData(context.Background(), md) + err := mfc.ConsumeMetricsData(context.Background(), md) if err != nil { t.Errorf("Wanted nil got error") return @@ -127,7 +127,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { // Make one processor return error processors[1].(*mockMetricsConsumer).MustFail = true - mdp := NewMetricsProcessor(processors) + mfc := NewMetricsFanOutConnector(processors) md := consumerdata.MetricsData{ Metrics: make([]*metricspb.Metric, 5), } @@ -135,7 +135,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { var wantMetricsCount = 0 for i := 0; i < 2; i++ { wantMetricsCount += len(md.Metrics) - err := mdp.ConsumeMetricsData(context.Background(), md) + err := mfc.ConsumeMetricsData(context.Background(), md) if err == nil { t.Errorf("Wanted error got nil") return diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index 324cb922daf2..bed41ad37570 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -22,7 +22,6 @@ import ( "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/processor" - "github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor" ) // builtProcessor is a processor that is built based on a config. @@ -147,7 +146,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st } // Create a junction point that fans out to all exporters. - return fanoutprocessor.NewTraceProcessor(exporters) + return processor.NewTraceFanOutConnector(exporters) } func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer { @@ -164,5 +163,5 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] } // Create a junction point that fans out to all exporters. - return fanoutprocessor.NewMetricsProcessor(exporters) + return processor.NewMetricsFanOutConnector(exporters) } diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index 1160f7f2835b..00b7dd5e10a4 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -24,7 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/oterr" - "github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor" + "github.com/open-telemetry/opentelemetry-service/processor" "github.com/open-telemetry/opentelemetry-service/receiver" ) @@ -260,7 +260,7 @@ func buildFanoutTraceConsumer(pipelineFrontProcessors []*builtProcessor) consume } // Create a junction point that fans out to all pipelines. - return fanoutprocessor.NewTraceProcessor(pipelineConsumers) + return processor.NewTraceFanOutConnector(pipelineConsumers) } func buildFanoutMetricConsumer(pipelineFrontProcessors []*builtProcessor) consumer.MetricsConsumer { @@ -275,5 +275,5 @@ func buildFanoutMetricConsumer(pipelineFrontProcessors []*builtProcessor) consum } // Create a junction point that fans out to all pipelines. - return fanoutprocessor.NewMetricsProcessor(pipelineConsumers) + return processor.NewMetricsFanOutConnector(pipelineConsumers) }