From c270bef12f89c7ef7cf4b8efa657b141945403c7 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Mon, 15 Jul 2024 17:52:21 +0200 Subject: [PATCH] Move connector into internal package, in preparation for profiles (#10524) #### Description This splits the connector package, so the APIs are in an internal package, and redefined publicly for logs/metrics/traces. In preparation for adding profiles to the package. #### Link to tracking issue See https://github.com/open-telemetry/opentelemetry-collector/pull/10375 cc @mx-psi --------- Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- connector/builder.go | 216 +++++++++++++ connector/connector.go | 556 ++------------------------------ connector/connector_test.go | 37 +-- connector/internal/connector.go | 19 ++ connector/internal/factory.go | 374 +++++++++++++++++++++ connector/internal/logs.go | 24 ++ connector/internal/metrics.go | 25 ++ connector/internal/traces.go | 26 ++ 8 files changed, 732 insertions(+), 545 deletions(-) create mode 100644 connector/builder.go create mode 100644 connector/internal/connector.go create mode 100644 connector/internal/factory.go create mode 100644 connector/internal/logs.go create mode 100644 connector/internal/metrics.go create mode 100644 connector/internal/traces.go diff --git a/connector/builder.go b/connector/builder.go new file mode 100644 index 00000000000..e0e3c3b198c --- /dev/null +++ b/connector/builder.go @@ -0,0 +1,216 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector // import "go.opentelemetry.io/collector/connector" + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Builder is a helper struct that given a set of Configs and Factories helps with creating connectors. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new connector.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTracesToTraces creates a Traces connector based on the settings and config. +func (b *Builder) CreateTracesToTraces(ctx context.Context, set Settings, next consumer.Traces) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesToTracesStability()) + return f.CreateTracesToTraces(ctx, set, cfg, next) +} + +// CreateTracesToMetrics creates a Traces connector based on the settings and config. +func (b *Builder) CreateTracesToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesToMetricsStability()) + return f.CreateTracesToMetrics(ctx, set, cfg, next) +} + +// CreateTracesToLogs creates a Traces connector based on the settings and config. +func (b *Builder) CreateTracesToLogs(ctx context.Context, set Settings, next consumer.Logs) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesToLogsStability()) + return f.CreateTracesToLogs(ctx, set, cfg, next) +} + +// CreateMetricsToTraces creates a Metrics connector based on the settings and config. +func (b *Builder) CreateMetricsToTraces(ctx context.Context, set Settings, next consumer.Traces) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsToTracesStability()) + return f.CreateMetricsToTraces(ctx, set, cfg, next) +} + +// CreateMetricsToMetrics creates a Metrics connector based on the settings and config. +func (b *Builder) CreateMetricsToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsToMetricsStability()) + return f.CreateMetricsToMetrics(ctx, set, cfg, next) +} + +// CreateMetricsToLogs creates a Metrics connector based on the settings and config. +func (b *Builder) CreateMetricsToLogs(ctx context.Context, set Settings, next consumer.Logs) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsToLogsStability()) + return f.CreateMetricsToLogs(ctx, set, cfg, next) +} + +// CreateLogsToTraces creates a Logs connector based on the settings and config. +func (b *Builder) CreateLogsToTraces(ctx context.Context, set Settings, next consumer.Traces) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsToTracesStability()) + return f.CreateLogsToTraces(ctx, set, cfg, next) +} + +// CreateLogsToMetrics creates a Logs connector based on the settings and config. +func (b *Builder) CreateLogsToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsToMetricsStability()) + return f.CreateLogsToMetrics(ctx, set, cfg, next) +} + +// CreateLogsToLogs creates a Logs connector based on the settings and config. +func (b *Builder) CreateLogsToLogs(ctx context.Context, set Settings, next consumer.Logs) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("connector %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("connector factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsToLogsStability()) + return f.CreateLogsToLogs(ctx, set, cfg, next) +} + +func (b *Builder) IsConfigured(componentID component.ID) bool { + _, ok := b.cfgs[componentID] + return ok +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/connector/connector.go b/connector/connector.go index 48f2e6b089d..22aace7fc66 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -4,14 +4,11 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( - "context" "errors" "fmt" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/connector/internal" ) var ( @@ -30,10 +27,7 @@ var ( // the number of traces observed. // - Traces could be analyzed by a logs connector that emits events when particular // criteria are met. -type Traces interface { - component.Component - consumer.Traces -} +type Traces = internal.Traces // A Metrics connector acts as an exporter from a metrics pipeline and a receiver // to one or more traces, metrics, or logs pipelines. @@ -46,10 +40,7 @@ type Traces interface { // pipeline can then process and export the metric to the appropriate backend. // - Metrics could be analyzed by a logs connector that emits events when particular // criteria are met. -type Metrics interface { - component.Component - consumer.Metrics -} +type Metrics = internal.Metrics // A Logs connector acts as an exporter from a logs pipeline and a receiver // to one or more traces, metrics, or logs pipelines. @@ -61,383 +52,100 @@ type Metrics interface { // - Logs could be collected in one pipeline and routed to another logs pipeline // based on criteria such as attributes or other content of the log. The second // pipeline can then process and export the log to the appropriate backend. -type Logs interface { - component.Component - consumer.Logs -} +type Logs = internal.Logs // CreateSettings configures Connector creators. // // Deprecated: [v0.103.0] Use connector.Settings instead. -type CreateSettings = Settings +type CreateSettings = internal.Settings // Settings configures Connector creators. -type Settings struct { - // ID returns the ID of the component that will be created. - ID component.ID - - component.TelemetrySettings - - // BuildInfo can be used by components for informational purposes - BuildInfo component.BuildInfo -} +type Settings = internal.Settings // Factory is factory interface for connectors. // // This interface cannot be directly implemented. Implementations must // use the NewFactory to implement it. -type Factory interface { - component.Factory - - // CreateDefaultConfig creates the default configuration for the Connector. - // This method can be called multiple times depending on the pipeline - // configuration and should not cause side-effects that prevent the creation - // of multiple instances of the Connector. - // The object returned by this method needs to pass the checks implemented by - // 'configtest.CheckConfigStruct'. It is recommended to have these checks in the - // tests of any implementation of the Factory interface. - CreateDefaultConfig() component.Config - - CreateTracesToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) - CreateTracesToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Traces, error) - CreateTracesToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Traces, error) - - CreateMetricsToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Metrics, error) - CreateMetricsToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) - CreateMetricsToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Metrics, error) - - CreateLogsToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Logs, error) - CreateLogsToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Logs, error) - CreateLogsToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) - - TracesToTracesStability() component.StabilityLevel - TracesToMetricsStability() component.StabilityLevel - TracesToLogsStability() component.StabilityLevel - - MetricsToTracesStability() component.StabilityLevel - MetricsToMetricsStability() component.StabilityLevel - MetricsToLogsStability() component.StabilityLevel - - LogsToTracesStability() component.StabilityLevel - LogsToMetricsStability() component.StabilityLevel - LogsToLogsStability() component.StabilityLevel - - unexportedFactoryFunc() -} +type Factory = internal.Factory // FactoryOption applies changes to Factory. -type FactoryOption interface { - // apply applies the option. - apply(o *factory) -} - -var _ FactoryOption = (*factoryOptionFunc)(nil) +type FactoryOption = internal.FactoryOption -// factoryOptionFunc is an FactoryOption created through a function. -type factoryOptionFunc func(*factory) - -func (f factoryOptionFunc) apply(o *factory) { - f(o) +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + return internal.NewFactory(cfgType, createDefaultConfig, options...) } // CreateTracesToTracesFunc is the equivalent of Factory.CreateTracesToTraces(). -type CreateTracesToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) - -// CreateTracesToTraces implements Factory.CreateTracesToTraces(). -func (f CreateTracesToTracesFunc) CreateTracesToTraces( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces) (Traces, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeTraces, component.DataTypeTraces) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateTracesToTracesFunc = internal.CreateTracesToTracesFunc // CreateTracesToMetricsFunc is the equivalent of Factory.CreateTracesToMetrics(). -type CreateTracesToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Traces, error) - -// CreateTracesToMetrics implements Factory.CreateTracesToMetrics(). -func (f CreateTracesToMetricsFunc) CreateTracesToMetrics( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Traces, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeTraces, component.DataTypeMetrics) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateTracesToMetricsFunc = internal.CreateTracesToMetricsFunc // CreateTracesToLogsFunc is the equivalent of Factory.CreateTracesToLogs(). -type CreateTracesToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Traces, error) - -// CreateTracesToLogs implements Factory.CreateTracesToLogs(). -func (f CreateTracesToLogsFunc) CreateTracesToLogs( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Traces, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeTraces, component.DataTypeLogs) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateTracesToLogsFunc = internal.CreateTracesToLogsFunc // CreateMetricsToTracesFunc is the equivalent of Factory.CreateMetricsToTraces(). -type CreateMetricsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Metrics, error) - -// CreateMetricsToTraces implements Factory.CreateMetricsToTraces(). -func (f CreateMetricsToTracesFunc) CreateMetricsToTraces( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces, -) (Metrics, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeTraces) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateMetricsToTracesFunc = internal.CreateMetricsToTracesFunc // CreateMetricsToMetricsFunc is the equivalent of Factory.CreateMetricsToTraces(). -type CreateMetricsToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) - -// CreateMetricsToMetrics implements Factory.CreateMetricsToTraces(). -func (f CreateMetricsToMetricsFunc) CreateMetricsToMetrics( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Metrics, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeMetrics) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateMetricsToMetricsFunc = internal.CreateMetricsToMetricsFunc // CreateMetricsToLogsFunc is the equivalent of Factory.CreateMetricsToLogs(). -type CreateMetricsToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Metrics, error) - -// CreateMetricsToLogs implements Factory.CreateMetricsToLogs(). -func (f CreateMetricsToLogsFunc) CreateMetricsToLogs( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Metrics, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeLogs) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateMetricsToLogsFunc = internal.CreateMetricsToLogsFunc // CreateLogsToTracesFunc is the equivalent of Factory.CreateLogsToTraces(). -type CreateLogsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Logs, error) - -// CreateLogsToTraces implements Factory.CreateLogsToTraces(). -func (f CreateLogsToTracesFunc) CreateLogsToTraces( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces, -) (Logs, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeLogs, component.DataTypeTraces) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateLogsToTracesFunc = internal.CreateLogsToTracesFunc // CreateLogsToMetricsFunc is the equivalent of Factory.CreateLogsToMetrics(). -type CreateLogsToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Logs, error) - -// CreateLogsToMetrics implements Factory.CreateLogsToMetrics(). -func (f CreateLogsToMetricsFunc) CreateLogsToMetrics( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Logs, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeLogs, component.DataTypeMetrics) - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateLogsToMetricsFunc = internal.CreateLogsToMetricsFunc // CreateLogsToLogsFunc is the equivalent of Factory.CreateLogsToLogs(). -type CreateLogsToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) - -// CreateLogsToLogs implements Factory.CreateLogsToLogs(). -func (f CreateLogsToLogsFunc) CreateLogsToLogs( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Logs, error) { - if f == nil { - return nil, errDataTypes(set.ID, component.DataTypeLogs, component.DataTypeLogs) - } - return f(ctx, set, cfg, nextConsumer) -} - -// factory implements Factory. -type factory struct { - cfgType component.Type - component.CreateDefaultConfigFunc - - CreateTracesToTracesFunc - CreateTracesToMetricsFunc - CreateTracesToLogsFunc - - CreateMetricsToTracesFunc - CreateMetricsToMetricsFunc - CreateMetricsToLogsFunc - - CreateLogsToTracesFunc - CreateLogsToMetricsFunc - CreateLogsToLogsFunc - - tracesToTracesStabilityLevel component.StabilityLevel - tracesToMetricsStabilityLevel component.StabilityLevel - tracesToLogsStabilityLevel component.StabilityLevel - - metricsToTracesStabilityLevel component.StabilityLevel - metricsToMetricsStabilityLevel component.StabilityLevel - metricsToLogsStabilityLevel component.StabilityLevel - - logsToTracesStabilityLevel component.StabilityLevel - logsToMetricsStabilityLevel component.StabilityLevel - logsToLogsStabilityLevel component.StabilityLevel -} - -// Type returns the type of component. -func (f *factory) Type() component.Type { - return f.cfgType -} - -func (f *factory) unexportedFactoryFunc() {} +type CreateLogsToLogsFunc = internal.CreateLogsToLogsFunc // WithTracesToTraces overrides the default "error not supported" implementation for WithTracesToTraces and the default "undefined" stability level. func WithTracesToTraces(createTracesToTraces CreateTracesToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesToTracesStabilityLevel = sl - o.CreateTracesToTracesFunc = createTracesToTraces - }) + return internal.WithTracesToTraces(createTracesToTraces, sl) } // WithTracesToMetrics overrides the default "error not supported" implementation for WithTracesToMetrics and the default "undefined" stability level. func WithTracesToMetrics(createTracesToMetrics CreateTracesToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesToMetricsStabilityLevel = sl - o.CreateTracesToMetricsFunc = createTracesToMetrics - }) + return internal.WithTracesToMetrics(createTracesToMetrics, sl) } // WithTracesToLogs overrides the default "error not supported" implementation for WithTracesToLogs and the default "undefined" stability level. func WithTracesToLogs(createTracesToLogs CreateTracesToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesToLogsStabilityLevel = sl - o.CreateTracesToLogsFunc = createTracesToLogs - }) + return internal.WithTracesToLogs(createTracesToLogs, sl) } // WithMetricsToTraces overrides the default "error not supported" implementation for WithMetricsToTraces and the default "undefined" stability level. func WithMetricsToTraces(createMetricsToTraces CreateMetricsToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsToTracesStabilityLevel = sl - o.CreateMetricsToTracesFunc = createMetricsToTraces - }) + return internal.WithMetricsToTraces(createMetricsToTraces, sl) } // WithMetricsToMetrics overrides the default "error not supported" implementation for WithMetricsToMetrics and the default "undefined" stability level. func WithMetricsToMetrics(createMetricsToMetrics CreateMetricsToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsToMetricsStabilityLevel = sl - o.CreateMetricsToMetricsFunc = createMetricsToMetrics - }) + return internal.WithMetricsToMetrics(createMetricsToMetrics, sl) } // WithMetricsToLogs overrides the default "error not supported" implementation for WithMetricsToLogs and the default "undefined" stability level. func WithMetricsToLogs(createMetricsToLogs CreateMetricsToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsToLogsStabilityLevel = sl - o.CreateMetricsToLogsFunc = createMetricsToLogs - }) + return internal.WithMetricsToLogs(createMetricsToLogs, sl) } // WithLogsToTraces overrides the default "error not supported" implementation for WithLogsToTraces and the default "undefined" stability level. func WithLogsToTraces(createLogsToTraces CreateLogsToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsToTracesStabilityLevel = sl - o.CreateLogsToTracesFunc = createLogsToTraces - }) + return internal.WithLogsToTraces(createLogsToTraces, sl) } // WithLogsToMetrics overrides the default "error not supported" implementation for WithLogsToMetrics and the default "undefined" stability level. func WithLogsToMetrics(createLogsToMetrics CreateLogsToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsToMetricsStabilityLevel = sl - o.CreateLogsToMetricsFunc = createLogsToMetrics - }) + return internal.WithLogsToMetrics(createLogsToMetrics, sl) } // WithLogsToLogs overrides the default "error not supported" implementation for WithLogsToLogs and the default "undefined" stability level. func WithLogsToLogs(createLogsToLogs CreateLogsToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsToLogsStabilityLevel = sl - o.CreateLogsToLogsFunc = createLogsToLogs - }) -} - -func (f factory) TracesToTracesStability() component.StabilityLevel { - return f.tracesToTracesStabilityLevel -} - -func (f factory) TracesToMetricsStability() component.StabilityLevel { - return f.tracesToMetricsStabilityLevel -} - -func (f factory) TracesToLogsStability() component.StabilityLevel { - return f.tracesToLogsStabilityLevel -} - -func (f factory) MetricsToTracesStability() component.StabilityLevel { - return f.metricsToTracesStabilityLevel -} - -func (f factory) MetricsToMetricsStability() component.StabilityLevel { - return f.metricsToMetricsStabilityLevel -} - -func (f factory) MetricsToLogsStability() component.StabilityLevel { - return f.metricsToLogsStabilityLevel -} - -func (f factory) LogsToTracesStability() component.StabilityLevel { - return f.logsToTracesStabilityLevel -} - -func (f factory) LogsToMetricsStability() component.StabilityLevel { - return f.logsToMetricsStabilityLevel -} - -func (f factory) LogsToLogsStability() component.StabilityLevel { - return f.logsToLogsStabilityLevel -} - -// NewFactory returns a Factory. -func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - f := &factory{ - cfgType: cfgType, - CreateDefaultConfigFunc: createDefaultConfig, - } - for _, opt := range options { - opt.apply(f) - } - return f + return internal.WithLogsToLogs(createLogsToLogs, sl) } // MakeFactoryMap takes a list of connector factories and returns a map with factory type as keys. @@ -452,209 +160,3 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } - -// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors. -type Builder struct { - cfgs map[component.ID]component.Config - factories map[component.Type]Factory -} - -// NewBuilder creates a new connector.Builder to help with creating components form a set of configs and factories. -func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { - return &Builder{cfgs: cfgs, factories: factories} -} - -// CreateTracesToTraces creates a Traces connector based on the settings and config. -func (b *Builder) CreateTracesToTraces(ctx context.Context, set Settings, next consumer.Traces) (Traces, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.TracesToTracesStability()) - return f.CreateTracesToTraces(ctx, set, cfg, next) -} - -// CreateTracesToMetrics creates a Traces connector based on the settings and config. -func (b *Builder) CreateTracesToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Traces, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.TracesToMetricsStability()) - return f.CreateTracesToMetrics(ctx, set, cfg, next) -} - -// CreateTracesToLogs creates a Traces connector based on the settings and config. -func (b *Builder) CreateTracesToLogs(ctx context.Context, set Settings, next consumer.Logs) (Traces, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.TracesToLogsStability()) - return f.CreateTracesToLogs(ctx, set, cfg, next) -} - -// CreateMetricsToTraces creates a Metrics connector based on the settings and config. -func (b *Builder) CreateMetricsToTraces(ctx context.Context, set Settings, next consumer.Traces) (Metrics, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.MetricsToTracesStability()) - return f.CreateMetricsToTraces(ctx, set, cfg, next) -} - -// CreateMetricsToMetrics creates a Metrics connector based on the settings and config. -func (b *Builder) CreateMetricsToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Metrics, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.MetricsToMetricsStability()) - return f.CreateMetricsToMetrics(ctx, set, cfg, next) -} - -// CreateMetricsToLogs creates a Metrics connector based on the settings and config. -func (b *Builder) CreateMetricsToLogs(ctx context.Context, set Settings, next consumer.Logs) (Metrics, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.MetricsToLogsStability()) - return f.CreateMetricsToLogs(ctx, set, cfg, next) -} - -// CreateLogsToTraces creates a Logs connector based on the settings and config. -func (b *Builder) CreateLogsToTraces(ctx context.Context, set Settings, next consumer.Traces) (Logs, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.LogsToTracesStability()) - return f.CreateLogsToTraces(ctx, set, cfg, next) -} - -// CreateLogsToMetrics creates a Logs connector based on the settings and config. -func (b *Builder) CreateLogsToMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Logs, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.LogsToMetricsStability()) - return f.CreateLogsToMetrics(ctx, set, cfg, next) -} - -// CreateLogsToLogs creates a Logs connector based on the settings and config. -func (b *Builder) CreateLogsToLogs(ctx context.Context, set Settings, next consumer.Logs) (Logs, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("connector %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("connector factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.LogsToLogsStability()) - return f.CreateLogsToLogs(ctx, set, cfg, next) -} - -func (b *Builder) IsConfigured(componentID component.ID) bool { - _, ok := b.cfgs[componentID] - return ok -} - -func (b *Builder) Factory(componentType component.Type) component.Factory { - return b.factories[componentType] -} - -// logStabilityLevel logs the stability level of a component. The log level is set to info for -// undefined, unmaintained, deprecated and development. The log level is set to debug -// for alpha, beta and stable. -func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { - if sl >= component.StabilityLevelAlpha { - logger.Debug(sl.LogMessage()) - } else { - logger.Info(sl.LogMessage()) - } -} - -func errDataTypes(id component.ID, from, to component.DataType) error { - return fmt.Errorf("connector %q cannot connect from %s to %s: %w", id, from, to, component.ErrDataTypeIsNotSupported) -} diff --git a/connector/connector_test.go b/connector/connector_test.go index 8ead909f2bd..8f5f46137eb 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/connector/internal" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" ) @@ -29,25 +30,25 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) _, err := factory.CreateTracesToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) _, err = factory.CreateTracesToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) _, err = factory.CreateTracesToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) _, err = factory.CreateMetricsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) _, err = factory.CreateMetricsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) _, err = factory.CreateLogsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) } func TestNewFactoryWithSameTypes(t *testing.T) { @@ -72,19 +73,19 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.NoError(t, err) _, err = factory.CreateTracesToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) _, err = factory.CreateTracesToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) _, err = factory.CreateMetricsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) _, err = factory.CreateMetricsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) _, err = factory.CreateLogsToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) } func TestNewFactoryWithTranslateTypes(t *testing.T) { @@ -100,11 +101,11 @@ func TestNewFactoryWithTranslateTypes(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) _, err := factory.CreateTracesToTraces(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) _, err = factory.CreateMetricsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) - assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) + assert.Equal(t, err, internal.ErrDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) assert.Equal(t, component.StabilityLevelDevelopment, factory.TracesToMetricsStability()) _, err = factory.CreateTracesToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) diff --git a/connector/internal/connector.go b/connector/internal/connector.go new file mode 100644 index 00000000000..997e8891531 --- /dev/null +++ b/connector/internal/connector.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/connector/internal" + +import ( + "go.opentelemetry.io/collector/component" +) + +// Settings configures Connector creators. +type Settings struct { + // ID returns the ID of the component that will be created. + ID component.ID + + component.TelemetrySettings + + // BuildInfo can be used by components for informational purposes + BuildInfo component.BuildInfo +} diff --git a/connector/internal/factory.go b/connector/internal/factory.go new file mode 100644 index 00000000000..4fb92a8a47b --- /dev/null +++ b/connector/internal/factory.go @@ -0,0 +1,374 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/connector/internal" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Factory is a factory interface for connectors. +// +// This interface cannot be directly implemented. Implementations must +// use the NewFactory to implement it. +type Factory interface { + component.Factory + + // CreateDefaultConfig creates the default configuration for the Connector. + // This method can be called multiple times depending on the pipeline + // configuration and should not cause side-effects that prevent the creation + // of multiple instances of the Connector. + // The object returned by this method needs to pass the checks implemented by + // 'configtest.CheckConfigStruct'. It is recommended to have these checks in the + // tests of any implementation of the Factory interface. + CreateDefaultConfig() component.Config + + CreateTracesToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) + CreateTracesToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Traces, error) + CreateTracesToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Traces, error) + + CreateMetricsToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Metrics, error) + CreateMetricsToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) + CreateMetricsToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Metrics, error) + + CreateLogsToTraces(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Logs, error) + CreateLogsToMetrics(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Logs, error) + CreateLogsToLogs(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) + + TracesToTracesStability() component.StabilityLevel + TracesToMetricsStability() component.StabilityLevel + TracesToLogsStability() component.StabilityLevel + + MetricsToTracesStability() component.StabilityLevel + MetricsToMetricsStability() component.StabilityLevel + MetricsToLogsStability() component.StabilityLevel + + LogsToTracesStability() component.StabilityLevel + LogsToMetricsStability() component.StabilityLevel + LogsToLogsStability() component.StabilityLevel + + unexportedFactoryFunc() +} + +// FactoryOption applies changes to Factory. +type FactoryOption interface { + // apply applies the option. + apply(o *factory) +} + +var _ FactoryOption = (*factoryOptionFunc)(nil) + +// factoryOptionFunc is an FactoryOption created through a function. +type factoryOptionFunc func(*factory) + +func (f factoryOptionFunc) apply(o *factory) { + f(o) +} + +// factory implements the Factory interface. +type factory struct { + cfgType component.Type + component.CreateDefaultConfigFunc + + CreateTracesToTracesFunc + CreateTracesToMetricsFunc + CreateTracesToLogsFunc + + CreateMetricsToTracesFunc + CreateMetricsToMetricsFunc + CreateMetricsToLogsFunc + + CreateLogsToTracesFunc + CreateLogsToMetricsFunc + CreateLogsToLogsFunc + + tracesToTracesStabilityLevel component.StabilityLevel + tracesToMetricsStabilityLevel component.StabilityLevel + tracesToLogsStabilityLevel component.StabilityLevel + + metricsToTracesStabilityLevel component.StabilityLevel + metricsToMetricsStabilityLevel component.StabilityLevel + metricsToLogsStabilityLevel component.StabilityLevel + + logsToTracesStabilityLevel component.StabilityLevel + logsToMetricsStabilityLevel component.StabilityLevel + logsToLogsStabilityLevel component.StabilityLevel +} + +// CreateTracesToTracesFunc is the equivalent of Factory.CreateTracesToTraces(). +type CreateTracesToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) + +// CreateTracesToTraces implements Factory.CreateTracesToTraces(). +func (f CreateTracesToTracesFunc) CreateTracesToTraces( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces) (Traces, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeTraces, component.DataTypeTraces) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateTracesToMetricsFunc is the equivalent of Factory.CreateTracesToMetrics(). +type CreateTracesToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Traces, error) + +// CreateTracesToMetrics implements Factory.CreateTracesToMetrics(). +func (f CreateTracesToMetricsFunc) CreateTracesToMetrics( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Traces, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeTraces, component.DataTypeMetrics) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateTracesToLogsFunc is the equivalent of Factory.CreateTracesToLogs(). +type CreateTracesToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Traces, error) + +// CreateTracesToLogs implements Factory.CreateTracesToLogs(). +func (f CreateTracesToLogsFunc) CreateTracesToLogs( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Traces, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeTraces, component.DataTypeLogs) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateMetricsToTracesFunc is the equivalent of Factory.CreateMetricsToTraces(). +type CreateMetricsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Metrics, error) + +// CreateMetricsToTraces implements Factory.CreateMetricsToTraces(). +func (f CreateMetricsToTracesFunc) CreateMetricsToTraces( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (Metrics, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeTraces) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateMetricsToMetricsFunc is the equivalent of Factory.CreateMetricsToTraces(). +type CreateMetricsToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) + +// CreateMetricsToMetrics implements Factory.CreateMetricsToTraces(). +func (f CreateMetricsToMetricsFunc) CreateMetricsToMetrics( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Metrics, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeMetrics) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateMetricsToLogsFunc is the equivalent of Factory.CreateMetricsToLogs(). +type CreateMetricsToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Metrics, error) + +// CreateMetricsToLogs implements Factory.CreateMetricsToLogs(). +func (f CreateMetricsToLogsFunc) CreateMetricsToLogs( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Metrics, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeMetrics, component.DataTypeLogs) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateLogsToTracesFunc is the equivalent of Factory.CreateLogsToTraces(). +type CreateLogsToTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Logs, error) + +// CreateLogsToTraces implements Factory.CreateLogsToTraces(). +func (f CreateLogsToTracesFunc) CreateLogsToTraces( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (Logs, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeLogs, component.DataTypeTraces) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateLogsToMetricsFunc is the equivalent of Factory.CreateLogsToMetrics(). +type CreateLogsToMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Logs, error) + +// CreateLogsToMetrics implements Factory.CreateLogsToMetrics(). +func (f CreateLogsToMetricsFunc) CreateLogsToMetrics( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Logs, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeLogs, component.DataTypeMetrics) + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateLogsToLogsFunc is the equivalent of Factory.CreateLogsToLogs(). +type CreateLogsToLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) + +// CreateLogsToLogs implements Factory.CreateLogsToLogs(). +func (f CreateLogsToLogsFunc) CreateLogsToLogs( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Logs, error) { + if f == nil { + return nil, ErrDataTypes(set.ID, component.DataTypeLogs, component.DataTypeLogs) + } + return f(ctx, set, cfg, nextConsumer) +} + +// Type returns the type of component. +func (f *factory) Type() component.Type { + return f.cfgType +} + +func (f *factory) unexportedFactoryFunc() {} + +func (f factory) TracesToTracesStability() component.StabilityLevel { + return f.tracesToTracesStabilityLevel +} + +func (f factory) TracesToMetricsStability() component.StabilityLevel { + return f.tracesToMetricsStabilityLevel +} + +func (f factory) TracesToLogsStability() component.StabilityLevel { + return f.tracesToLogsStabilityLevel +} + +func (f factory) MetricsToTracesStability() component.StabilityLevel { + return f.metricsToTracesStabilityLevel +} + +func (f factory) MetricsToMetricsStability() component.StabilityLevel { + return f.metricsToMetricsStabilityLevel +} + +func (f factory) MetricsToLogsStability() component.StabilityLevel { + return f.metricsToLogsStabilityLevel +} + +func (f factory) LogsToTracesStability() component.StabilityLevel { + return f.logsToTracesStabilityLevel +} + +func (f factory) LogsToMetricsStability() component.StabilityLevel { + return f.logsToMetricsStabilityLevel +} + +func (f factory) LogsToLogsStability() component.StabilityLevel { + return f.logsToLogsStabilityLevel +} + +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + f := &factory{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.apply(f) + } + return f +} + +// WithTracesToTraces overrides the default "error not supported" implementation for WithTracesToTraces and the default "undefined" stability level. +func WithTracesToTraces(createTracesToTraces CreateTracesToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.tracesToTracesStabilityLevel = sl + o.CreateTracesToTracesFunc = createTracesToTraces + }) +} + +// WithTracesToMetrics overrides the default "error not supported" implementation for WithTracesToMetrics and the default "undefined" stability level. +func WithTracesToMetrics(createTracesToMetrics CreateTracesToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.tracesToMetricsStabilityLevel = sl + o.CreateTracesToMetricsFunc = createTracesToMetrics + }) +} + +// WithTracesToLogs overrides the default "error not supported" implementation for WithTracesToLogs and the default "undefined" stability level. +func WithTracesToLogs(createTracesToLogs CreateTracesToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.tracesToLogsStabilityLevel = sl + o.CreateTracesToLogsFunc = createTracesToLogs + }) +} + +// WithMetricsToTraces overrides the default "error not supported" implementation for WithMetricsToTraces and the default "undefined" stability level. +func WithMetricsToTraces(createMetricsToTraces CreateMetricsToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metricsToTracesStabilityLevel = sl + o.CreateMetricsToTracesFunc = createMetricsToTraces + }) +} + +// WithMetricsToMetrics overrides the default "error not supported" implementation for WithMetricsToMetrics and the default "undefined" stability level. +func WithMetricsToMetrics(createMetricsToMetrics CreateMetricsToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metricsToMetricsStabilityLevel = sl + o.CreateMetricsToMetricsFunc = createMetricsToMetrics + }) +} + +// WithMetricsToLogs overrides the default "error not supported" implementation for WithMetricsToLogs and the default "undefined" stability level. +func WithMetricsToLogs(createMetricsToLogs CreateMetricsToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metricsToLogsStabilityLevel = sl + o.CreateMetricsToLogsFunc = createMetricsToLogs + }) +} + +// WithLogsToTraces overrides the default "error not supported" implementation for WithLogsToTraces and the default "undefined" stability level. +func WithLogsToTraces(createLogsToTraces CreateLogsToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.logsToTracesStabilityLevel = sl + o.CreateLogsToTracesFunc = createLogsToTraces + }) +} + +// WithLogsToMetrics overrides the default "error not supported" implementation for WithLogsToMetrics and the default "undefined" stability level. +func WithLogsToMetrics(createLogsToMetrics CreateLogsToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.logsToMetricsStabilityLevel = sl + o.CreateLogsToMetricsFunc = createLogsToMetrics + }) +} + +// WithLogsToLogs overrides the default "error not supported" implementation for WithLogsToLogs and the default "undefined" stability level. +func WithLogsToLogs(createLogsToLogs CreateLogsToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.logsToLogsStabilityLevel = sl + o.CreateLogsToLogsFunc = createLogsToLogs + }) +} + +func ErrDataTypes(id component.ID, from, to component.DataType) error { + return fmt.Errorf("connector %q cannot connect from %s to %s: %w", id, from, to, component.ErrDataTypeIsNotSupported) +} diff --git a/connector/internal/logs.go b/connector/internal/logs.go new file mode 100644 index 00000000000..c3077db555b --- /dev/null +++ b/connector/internal/logs.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/connector/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// A Logs connector acts as an exporter from a logs pipeline and a receiver +// to one or more traces, metrics, or logs pipelines. +// Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. +// +// Examples: +// - Structured logs containing span information could be consumed and emitted as traces. +// - Metrics could be extracted from structured logs that contain numeric data. +// - Logs could be collected in one pipeline and routed to another logs pipeline +// based on criteria such as attributes or other content of the log. The second +// pipeline can then process and export the log to the appropriate backend. +type Logs interface { + component.Component + consumer.Logs +} diff --git a/connector/internal/metrics.go b/connector/internal/metrics.go new file mode 100644 index 00000000000..750607d43c1 --- /dev/null +++ b/connector/internal/metrics.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/connector/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// A Metrics connector acts as an exporter from a metrics pipeline and a receiver +// to one or more traces, metrics, or logs pipelines. +// Metrics feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. +// +// Examples: +// - Latency between related data points could be modeled and emitted as traces. +// - Metrics could be collected in one pipeline and routed to another metrics pipeline +// based on criteria such as attributes or other content of the metric. The second +// pipeline can then process and export the metric to the appropriate backend. +// - Metrics could be analyzed by a logs connector that emits events when particular +// criteria are met. +type Metrics interface { + component.Component + consumer.Metrics +} diff --git a/connector/internal/traces.go b/connector/internal/traces.go new file mode 100644 index 00000000000..fc1ee2d5500 --- /dev/null +++ b/connector/internal/traces.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/connector/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// A Traces connector acts as an exporter from a traces pipeline and a receiver +// to one or more traces, metrics, or logs pipelines. +// Traces feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. +// +// Examples: +// - Traces could be collected in one pipeline and routed to another traces pipeline +// based on criteria such as attributes or other content of the trace. The second +// pipeline can then process and export the trace to the appropriate backend. +// - Traces could be summarized by a metrics connector that emits statistics describing +// the number of traces observed. +// - Traces could be analyzed by a logs connector that emits events when particular +// criteria are met. +type Traces interface { + component.Component + consumer.Traces +}