Skip to content

Commit

Permalink
[chore][processor/routingprocessor] Simplify exporters registration. (#…
Browse files Browse the repository at this point in the history
…13867)

* [chore][processor/routingprocessor] Simplify exporters registration.

* [feedback] Use dedicated methods for default and route exporters.
  • Loading branch information
kovrus authored Sep 6, 2022
1 parent d2490af commit 2631414
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 85 deletions.
16 changes: 8 additions & 8 deletions processor/routingprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.LogsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): lExp,
},
}
},
Expand Down Expand Up @@ -120,8 +120,8 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.LogsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): lExp,
},
}
},
Expand Down Expand Up @@ -178,8 +178,8 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T)
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.LogsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): lExp,
},
}
},
Expand Down Expand Up @@ -225,8 +225,8 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.LogsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): lExp,
},
}
},
Expand Down
20 changes: 10 additions & 10 deletions processor/routingprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestMetrics_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): mExp,
},
}
},
Expand Down Expand Up @@ -119,8 +119,8 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): mExp,
},
}
},
Expand Down Expand Up @@ -183,8 +183,8 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): mExp,
},
}
},
Expand Down Expand Up @@ -241,8 +241,8 @@ func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): mExp,
},
}
},
Expand Down Expand Up @@ -306,8 +306,8 @@ func Benchmark_MetricsRouting_ResourceAttribute(b *testing.B) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.MetricsDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): mExp,
},
}
},
Expand Down
122 changes: 63 additions & 59 deletions processor/routingprocessor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import (
"go.uber.org/zap"
)

var (
errDefaultExporterNotFound = errors.New("default exporter not found")
errExporterNotFound = errors.New("exporter not found")
)
var errExporterNotFound = errors.New("exporter not found")

// router registers exporters and default exporters for an exporter. router can
// be instantiated with component.TracesExporter, component.MetricsExporter, and
Expand All @@ -50,81 +47,88 @@ func newRouter[E component.Exporter](config Config, logger *zap.Logger) router[E
}
}

func (r *router[E]) registerExporters(exporters map[config.ComponentID]component.Exporter) error {
available := make(map[string]component.Exporter)
for id, exp := range exporters {
exporter, ok := exp.(E)
if !ok {
return fmt.Errorf("the exporter %q isn't a %T exporter", id.String(), new(E))
}
available[id.String()] = exporter
func (r *router[E]) registerExporters(available map[config.ComponentID]component.Exporter) error {
// register default exporters
err := r.registerDefaultExporters(available)
if err != nil {
return err
}

// default exporters
r.registerDefaultExporters(available)

// exporters for each route
// register exporters for each route
for _, entry := range r.config.Table {
r.registerRouteExporters(entry.Value, available, entry.Exporters)
err := r.registerRouteExporters(entry.Value, entry.Exporters, available)
if err != nil {
return err
}
}

return nil
}

// registerDefaultExporters registers the configured default exporters
// using the provided available exporters map.
func (r *router[E]) registerDefaultExporters(availableExporters map[string]component.Exporter) {
for _, e := range r.config.DefaultExporters {
v, ok := availableExporters[e]
if !ok {
r.logger.Warn(
"Can't find the exporter for the routing processor for this pipeline type."+
" This is OK if you did not specify this processor for that pipeline type",
zap.Any("pipeline_type", new(E)),
zap.Error(
fmt.Errorf(
"error registering default exporter %q: %w",
e,
errDefaultExporterNotFound,
),
),
)
func (r *router[E]) registerDefaultExporters(available map[config.ComponentID]component.Exporter) error {
for _, name := range r.config.DefaultExporters {
e, err := r.extractExporter(name, available)
if errors.Is(err, errExporterNotFound) {
continue
}
r.defaultExporters = append(r.defaultExporters, v.(E))
if err != nil {
return err
}
r.defaultExporters = append(r.defaultExporters, e)
}

return nil
}

// registerRouteExporters registers the requested exporters using the provided
// registerRouteExporters registers route exporters using the provided
// available exporters map to check if they were available.
func (r *router[E]) registerRouteExporters(
route string,
availableExporters map[string]component.Exporter,
exporters []string,
) {
r.logger.Debug("Registering exporter for route",
zap.String("route", route),
zap.Any("requested", exporters),
)

for _, e := range exporters {
v, ok := availableExporters[e]
if !ok {
r.logger.Warn(
"Can't find the exporter for the routing processor for this pipeline type."+
" This is OK if you did not specify this processor for that pipeline type",
zap.Any("pipeline_type", new(E)),
zap.Error(
fmt.Errorf(
"error registering route %q for exporter %q: %w",
route,
e,
errExporterNotFound,
),
),
)
available map[config.ComponentID]component.Exporter,
) error {
for _, name := range exporters {
e, err := r.extractExporter(name, available)
if errors.Is(err, errExporterNotFound) {
continue
}
r.exporters[route] = append(r.exporters[route], v.(E))
if err != nil {
return err
}
r.exporters[route] = append(r.exporters[route], e)
}

return nil
}

func (r *router[E]) extractExporter(name string, available map[config.ComponentID]component.Exporter) (E, error) {
var exporter E

id, err := config.NewComponentIDFromString(name)
if err != nil {
return exporter, err
}
v, ok := available[id]
if !ok {
r.logger.Warn(
"Can't find the exporter for the routing processor for this pipeline type."+
" This is OK if you did not specify this processor for that pipeline type",
zap.Any("pipeline_type", new(E)),
zap.Error(
fmt.Errorf(
"error registering exporter %q",
name,
),
),
)
return exporter, errExporterNotFound
}
exporter, ok = v.(E)
if !ok {
return exporter,
fmt.Errorf("the exporter %q isn't a %T exporter", id.String(), new(E))
}
return exporter, nil
}
16 changes: 8 additions & 8 deletions processor/routingprocessor/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func TestTraces_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): tExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): tExp,
},
}
},
Expand Down Expand Up @@ -172,8 +172,8 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): tExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): tExp,
},
}
},
Expand Down Expand Up @@ -236,8 +236,8 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): tExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): tExp,
},
}
},
Expand Down Expand Up @@ -294,8 +294,8 @@ func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): tExp,
config.NewComponentID("otlp"): defaultExp,
config.NewComponentIDWithName("otlp", "2"): tExp,
},
}
},
Expand Down

0 comments on commit 2631414

Please sign in to comment.