Skip to content

Commit

Permalink
Introduce component status reporting
Browse files Browse the repository at this point in the history
## Summary of changes

- Add component/status package. This introduces the concepts of component status
  event and pipeline readiness status.
- Introduce Host.RegisterStatusListener() to allow components to listen for
  status changes.
- Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus().
- Move component ID and Type to a separate component/id package. This is necessary
  to avoid dependency cycle with the new component/status package. Large part of
  the diff in this commit is because of this particular change. If this is too large
  and difficult to review we can split this commit into 2 parts where this particular
  change is a separate commit.
- Deprecated component.ID and component.Type in favour of id.ID and id.Type.

## TODO after this is merged

- healthcheck extension must register and listen to component statuses.
- Replace all ReportFatalError() calls by ReportComponentStatus() calls
  in core and contrib.

## Open Questions

- Do we want to name component/id package component/componentid instead?
- The pipelines readiness status is also implemented in the component/status
  package. We can split the status, componentstatus and pipelinestatus into
  separate packages but not sure if it warrants creation of a new top-level
  package just to be able to stay pure in the component package.
  We can try this if think it is a better approach.
- Listeners need to be able to tell if all current components are healthy.
  It is assumed that the listeners need to maintain a map of components and
  track the status of each component. This works only if we assume that
  the set of components cannot change during the lifetime of the listener.
  This assumption is true today but can change in the future if we introduce
  partial pipeline restarts where only modified/added/removed components
  are recreated (this will break listener's assumption and the map will become
  invalid). Should we instead keep track of this entire status map in the Host
  and broadcast the entire status to the listeners as a whole instead of
  (or in addition to) individual component events?
  • Loading branch information
tigrannajaryan committed Nov 15, 2022
1 parent e4b2f6a commit d73c804
Show file tree
Hide file tree
Showing 54 changed files with 611 additions and 498 deletions.
8 changes: 5 additions & 3 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"
"errors"

"go.opentelemetry.io/collector/component/id"
)

var (
Expand Down Expand Up @@ -160,17 +162,17 @@ func (sl StabilityLevel) LogMessage() string {
// use the factory helpers for the appropriate component type.
type Factory interface {
// Type gets the type of the component created by this factory.
Type() Type
Type() id.Type

unexportedFactoryFunc()
}

type baseFactory struct {
cfgType Type
cfgType id.Type
}

func (baseFactory) unexportedFactoryFunc() {}

func (bf baseFactory) Type() Type {
func (bf baseFactory) Type() id.Type {
return bf.cfgType
}
3 changes: 2 additions & 1 deletion component/componenttest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
)
Expand All @@ -40,7 +41,7 @@ func NewNopExporterFactory() component.ExporterFactory {
"nop",
func() component.ExporterConfig {
return &nopExporterConfig{
ExporterSettings: config.NewExporterSettings(component.NewID("nop")),
ExporterSettings: config.NewExporterSettings(id.NewID("nop")),
}
},
component.WithTracesExporter(createTracesExporter, component.StabilityLevelStable),
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -31,9 +31,9 @@ import (
func TestNewNopExporterFactory(t *testing.T) {
factory := NewNopExporterFactory()
require.NotNil(t, factory)
assert.Equal(t, component.Type("nop"), factory.Type())
assert.Equal(t, id.Type("nop"), factory.Type())
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(component.NewID("nop"))}, cfg)
assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(id.NewID("nop"))}, cfg)

traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion component/componenttest/nop_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
)

Expand All @@ -39,7 +40,7 @@ func NewNopExtensionFactory() component.ExtensionFactory {
"nop",
func() component.ExtensionConfig {
return &nopExtensionConfig{
ExtensionSettings: config.NewExtensionSettings(component.NewID("nop")),
ExtensionSettings: config.NewExtensionSettings(id.NewID("nop")),
}
},
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) {
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
)

func TestNewNopExtensionFactory(t *testing.T) {
factory := NewNopExtensionFactory()
require.NotNil(t, factory)
assert.Equal(t, component.Type("nop"), factory.Type())
assert.Equal(t, id.Type("nop"), factory.Type())
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopExtensionConfig{ExtensionSettings: config.NewExtensionSettings(component.NewID("nop"))}, cfg)
assert.Equal(t, &nopExtensionConfig{ExtensionSettings: config.NewExtensionSettings(id.NewID("nop"))}, cfg)

traces, err := factory.CreateExtension(context.Background(), NewNopExtensionCreateSettings(), cfg)
require.NoError(t, err)
Expand Down
15 changes: 12 additions & 3 deletions component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package componenttest // import "go.opentelemetry.io/collector/component/compone

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/component/status"
)

// nopHost mocks a receiver.ReceiverHost for test purposes.
Expand All @@ -28,14 +30,21 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
func (nh *nopHost) GetFactory(_ component.Kind, _ id.Type) component.Factory {
return nil
}

func (nh *nopHost) GetExtensions() map[component.ID]component.Extension {
func (nh *nopHost) GetExtensions() map[id.ID]component.Extension {
return nil
}

func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter {
func (nh *nopHost) GetExporters() map[component.DataType]map[id.ID]component.Exporter {
return nil
}

func (nh *nopHost) ReportComponentStatus(event *status.ComponentEvent) {
}

func (nh *nopHost) RegisterStatusListener(options ...status.ListenerOption) component.StatusListenerUnregisterFunc {
return nil
}
3 changes: 3 additions & 0 deletions component/componenttest/nop_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/status"
)

func TestNewNopHost(t *testing.T) {
Expand All @@ -30,6 +31,8 @@ func TestNewNopHost(t *testing.T) {
require.IsType(t, &nopHost{}, nh)

nh.ReportFatalError(errors.New("TestError"))
nh.ReportComponentStatus(&status.ComponentEvent{})
assert.Nil(t, nh.RegisterStatusListener())
assert.Nil(t, nh.GetExporters())
assert.Nil(t, nh.GetExtensions())
assert.Nil(t, nh.GetFactory(component.KindReceiver, "test"))
Expand Down
3 changes: 2 additions & 1 deletion component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand All @@ -41,7 +42,7 @@ func NewNopProcessorFactory() component.ProcessorFactory {
"nop",
func() component.ProcessorConfig {
return &nopProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")),
ProcessorSettings: config.NewProcessorSettings(id.NewID("nop")),
}
},
component.WithTracesProcessor(createTracesProcessor, component.StabilityLevelStable),
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand All @@ -33,9 +33,9 @@ import (
func TestNewNopProcessorFactory(t *testing.T) {
factory := NewNopProcessorFactory()
require.NotNil(t, factory)
assert.Equal(t, component.Type("nop"), factory.Type())
assert.Equal(t, id.Type("nop"), factory.Type())
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(component.NewID("nop"))}, cfg)
assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(id.NewID("nop"))}, cfg)

traces, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion component/componenttest/nop_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)
Expand All @@ -40,7 +41,7 @@ func NewNopReceiverFactory() component.ReceiverFactory {
"nop",
func() component.ReceiverConfig {
return &nopReceiverConfig{
ReceiverSettings: config.NewReceiverSettings(component.NewID("nop")),
ReceiverSettings: config.NewReceiverSettings(id.NewID("nop")),
}
},
component.WithTracesReceiver(createTracesReceiver, component.StabilityLevelStable),
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
)

func TestNewNopReceiverFactory(t *testing.T) {
factory := NewNopReceiverFactory()
require.NotNil(t, factory)
assert.Equal(t, component.Type("nop"), factory.Type())
assert.Equal(t, id.Type("nop"), factory.Type())
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopReceiverConfig{ReceiverSettings: config.NewReceiverSettings(component.NewID("nop"))}, cfg)
assert.Equal(t, &nopReceiverConfig{ReceiverSettings: config.NewReceiverSettings(id.NewID("nop"))}, cfg)

traces, err := factory.CreateTracesReceiver(context.Background(), NewNopReceiverCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
Expand Down
6 changes: 2 additions & 4 deletions component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package component // import "go.opentelemetry.io/collector/component"

import (
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/confmap"
)

// Type is the component type as it is used in the config.
type Type string

// validatable defines the interface for the configuration validation.
type validatable interface {
// Validate validates the configuration and returns an error if invalid.
Expand All @@ -29,7 +27,7 @@ type validatable interface {

// DataType is a special Type that represents the data types supported by the collector. We currently support
// collecting metrics, traces and logs, this can expand in the future.
type DataType = Type
type DataType = id.Type

// Currently supported data types. Add new data types here when new types are supported in the future.
const (
Expand Down
3 changes: 2 additions & 1 deletion component/experimental/component/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config/experimental/config"
"go.opentelemetry.io/collector/config/experimental/configsource"
)
Expand Down Expand Up @@ -56,4 +57,4 @@ type ConfigSourceFactory interface {
}

// ConfigSourceFactories maps the type of a ConfigSource to the respective factory object.
type ConfigSourceFactories map[component.Type]ConfigSourceFactory
type ConfigSourceFactories map[id.Type]ConfigSourceFactory
3 changes: 2 additions & 1 deletion component/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/consumer"
)
Expand Down Expand Up @@ -213,7 +214,7 @@ func WithLogsExporter(createLogsExporter CreateLogsExporterFunc, sl StabilityLev
}

// NewExporterFactory returns a ExporterFactory.
func NewExporterFactory(cfgType Type, createDefaultConfig ExporterCreateDefaultConfigFunc, options ...ExporterFactoryOption) ExporterFactory {
func NewExporterFactory(cfgType id.Type, createDefaultConfig ExporterCreateDefaultConfigFunc, options ...ExporterFactoryOption) ExporterFactory {
f := &exporterFactory{
baseFactory: baseFactory{cfgType: cfgType},
ExporterCreateDefaultConfigFunc: createDefaultConfig,
Expand Down
5 changes: 3 additions & 2 deletions component/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
)

func TestNewExporterFactory(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewExporterSettings(component.NewID(typeStr))
defaultCfg := config.NewExporterSettings(id.NewID(typeStr))
factory := component.NewExporterFactory(
typeStr,
func() component.ExporterConfig { return &defaultCfg })
Expand All @@ -44,7 +45,7 @@ func TestNewExporterFactory(t *testing.T) {

func TestNewExporterFactory_WithOptions(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewExporterSettings(component.NewID(typeStr))
defaultCfg := config.NewExporterSettings(id.NewID(typeStr))
factory := component.NewExporterFactory(
typeStr,
func() component.ExporterConfig { return &defaultCfg },
Expand Down
4 changes: 3 additions & 1 deletion component/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/confmap"
)

Expand Down Expand Up @@ -47,6 +48,7 @@ type Extension interface {
// Collector that is to be implemented by extensions interested in changes to pipeline
// states. Typically this will be used by extensions that change their behavior if data is
// being ingested or not, e.g.: a k8s readiness probe.
// Deprecated: [0.65.0] Use Host.RegisterStatusListener() instead.
type PipelineWatcher interface {
// Ready notifies the Extension that all pipelines were built and the
// receivers were started, i.e.: the service is ready to receive data
Expand Down Expand Up @@ -117,7 +119,7 @@ func (ef *extensionFactory) ExtensionStability() StabilityLevel {

// NewExtensionFactory returns a new ExtensionFactory based on this configuration.
func NewExtensionFactory(
cfgType Type,
cfgType id.Type,
createDefaultConfig ExtensionCreateDefaultConfigFunc,
createServiceExtension CreateExtensionFunc,
sl StabilityLevel) ExtensionFactory {
Expand Down
3 changes: 2 additions & 1 deletion component/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/id"
"go.opentelemetry.io/collector/config"
)

Expand All @@ -33,7 +34,7 @@ type nopExtension struct {

func TestNewExtensionFactory(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewExtensionSettings(component.NewID(typeStr))
defaultCfg := config.NewExtensionSettings(id.NewID(typeStr))
nopExtensionInstance := new(nopExtension)

factory := component.NewExtensionFactory(
Expand Down
Loading

0 comments on commit d73c804

Please sign in to comment.