diff --git a/processor/routingprocessor/logs_test.go b/processor/routingprocessor/logs_test.go index 4f216ea810a2..04056ff00806 100644 --- a/processor/routingprocessor/logs_test.go +++ b/processor/routingprocessor/logs_test.go @@ -56,8 +56,8 @@ func TestLogs_RoutingWorks_Context(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.LogsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): lExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): lExp, }, } }, @@ -120,8 +120,8 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.LogsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): lExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): lExp, }, } }, @@ -178,8 +178,8 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.LogsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): lExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): lExp, }, } }, @@ -225,8 +225,8 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.LogsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): lExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): lExp, }, } }, diff --git a/processor/routingprocessor/metrics_test.go b/processor/routingprocessor/metrics_test.go index 16cfacdc9c99..d662b534f3f1 100644 --- a/processor/routingprocessor/metrics_test.go +++ b/processor/routingprocessor/metrics_test.go @@ -56,8 +56,8 @@ func TestMetrics_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.MetricsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): mExp, }, } }, @@ -119,8 +119,8 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.MetricsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): mExp, }, } }, @@ -183,8 +183,8 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.MetricsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): mExp, }, } }, @@ -241,8 +241,8 @@ func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.MetricsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): mExp, }, } }, @@ -306,8 +306,8 @@ func Benchmark_MetricsRouting_ResourceAttribute(b *testing.B) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.MetricsDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): mExp, }, } }, diff --git a/processor/routingprocessor/router.go b/processor/routingprocessor/router.go index 17ef16dd3f82..7a7c8f5f8aa3 100644 --- a/processor/routingprocessor/router.go +++ b/processor/routingprocessor/router.go @@ -23,10 +23,7 @@ import ( "go.uber.org/zap" ) -var ( - errDefaultExporterNotFound = errors.New("default exporter not found") - errExporterNotFound = errors.New("exporter not found") -) +var errExporterNotFound = errors.New("exporter not found") // router registers exporters and default exporters for an exporter. router can // be instantiated with component.TracesExporter, component.MetricsExporter, and @@ -50,22 +47,19 @@ func newRouter[E component.Exporter](config Config, logger *zap.Logger) router[E } } -func (r *router[E]) registerExporters(exporters map[config.ComponentID]component.Exporter) error { - available := make(map[string]component.Exporter) - for id, exp := range exporters { - exporter, ok := exp.(E) - if !ok { - return fmt.Errorf("the exporter %q isn't a %T exporter", id.String(), new(E)) - } - available[id.String()] = exporter +func (r *router[E]) registerExporters(available map[config.ComponentID]component.Exporter) error { + // register default exporters + err := r.registerDefaultExporters(available) + if err != nil { + return err } - // default exporters - r.registerDefaultExporters(available) - - // exporters for each route + // register exporters for each route for _, entry := range r.config.Table { - r.registerRouteExporters(entry.Value, available, entry.Exporters) + err := r.registerRouteExporters(entry.Value, entry.Exporters, available) + if err != nil { + return err + } } return nil @@ -73,58 +67,68 @@ func (r *router[E]) registerExporters(exporters map[config.ComponentID]component // registerDefaultExporters registers the configured default exporters // using the provided available exporters map. -func (r *router[E]) registerDefaultExporters(availableExporters map[string]component.Exporter) { - for _, e := range r.config.DefaultExporters { - v, ok := availableExporters[e] - if !ok { - r.logger.Warn( - "Can't find the exporter for the routing processor for this pipeline type."+ - " This is OK if you did not specify this processor for that pipeline type", - zap.Any("pipeline_type", new(E)), - zap.Error( - fmt.Errorf( - "error registering default exporter %q: %w", - e, - errDefaultExporterNotFound, - ), - ), - ) +func (r *router[E]) registerDefaultExporters(available map[config.ComponentID]component.Exporter) error { + for _, name := range r.config.DefaultExporters { + e, err := r.extractExporter(name, available) + if errors.Is(err, errExporterNotFound) { continue } - r.defaultExporters = append(r.defaultExporters, v.(E)) + if err != nil { + return err + } + r.defaultExporters = append(r.defaultExporters, e) } + + return nil } -// registerRouteExporters registers the requested exporters using the provided +// registerRouteExporters registers route exporters using the provided // available exporters map to check if they were available. func (r *router[E]) registerRouteExporters( route string, - availableExporters map[string]component.Exporter, exporters []string, -) { - r.logger.Debug("Registering exporter for route", - zap.String("route", route), - zap.Any("requested", exporters), - ) - - for _, e := range exporters { - v, ok := availableExporters[e] - if !ok { - r.logger.Warn( - "Can't find the exporter for the routing processor for this pipeline type."+ - " This is OK if you did not specify this processor for that pipeline type", - zap.Any("pipeline_type", new(E)), - zap.Error( - fmt.Errorf( - "error registering route %q for exporter %q: %w", - route, - e, - errExporterNotFound, - ), - ), - ) + available map[config.ComponentID]component.Exporter, +) error { + for _, name := range exporters { + e, err := r.extractExporter(name, available) + if errors.Is(err, errExporterNotFound) { continue } - r.exporters[route] = append(r.exporters[route], v.(E)) + if err != nil { + return err + } + r.exporters[route] = append(r.exporters[route], e) + } + + return nil +} + +func (r *router[E]) extractExporter(name string, available map[config.ComponentID]component.Exporter) (E, error) { + var exporter E + + id, err := config.NewComponentIDFromString(name) + if err != nil { + return exporter, err + } + v, ok := available[id] + if !ok { + r.logger.Warn( + "Can't find the exporter for the routing processor for this pipeline type."+ + " This is OK if you did not specify this processor for that pipeline type", + zap.Any("pipeline_type", new(E)), + zap.Error( + fmt.Errorf( + "error registering exporter %q", + name, + ), + ), + ) + return exporter, errExporterNotFound + } + exporter, ok = v.(E) + if !ok { + return exporter, + fmt.Errorf("the exporter %q isn't a %T exporter", id.String(), new(E)) } + return exporter, nil } diff --git a/processor/routingprocessor/traces_test.go b/processor/routingprocessor/traces_test.go index 25760613a48b..b57669b4aa77 100644 --- a/processor/routingprocessor/traces_test.go +++ b/processor/routingprocessor/traces_test.go @@ -112,8 +112,8 @@ func TestTraces_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): tExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): tExp, }, } }, @@ -172,8 +172,8 @@ func TestTraces_RoutingWorks_Context(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): tExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): tExp, }, } }, @@ -236,8 +236,8 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): tExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): tExp, }, } }, @@ -294,8 +294,8 @@ func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing. GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { - config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): tExp, + config.NewComponentID("otlp"): defaultExp, + config.NewComponentIDWithName("otlp", "2"): tExp, }, } },