Skip to content

Commit

Permalink
Get things working after rebase
Browse files Browse the repository at this point in the history
This picks up where open-telemetry#6560
left off. The first step is to get the code introduced in that PR working with the collector
as it is today. There were significant changes to how pipelines are built and the
component package was split into separate packages based on type (extension, processor, etc).
This commit makes the necessary changes to get everything working, likely not in the most
ideal way, but it's a start that we can iterate on.
  • Loading branch information
mwear committed Sep 26, 2023
1 parent 4bea2fb commit 11c42d4
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"
package extensiontest // import "go.opentelemetry.io/collector/extension/extensiontest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/extension"
)

// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions.
func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings {
return component.ExtensionCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings {
return extension.CreateSettings{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type statusWatcherExtensionConfig struct {
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions.
func NewStatusWatcherExtensionFactory(
onStatusChanged func(source component.StatusSource, event *component.StatusEvent),
) component.ExtensionFactory {
return component.NewExtensionFactory(
) extension.Factory {
return extension.NewFactory(
"statuswatcher",
func() component.ExtensionConfig {
return &statusWatcherExtensionConfig{
ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")),
}
func() component.Config {
return &struct{}{}
},
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) {
func(context.Context, extension.CreateSettings, component.Config) (component.Component, error) {
return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil
},
component.StabilityLevelStable)
}

// statusWatcherExtension stores consumed traces and metrics for testing purposes.
type statusWatcherExtension struct {
nopComponent
component.StartFunc
component.ShutdownFunc
onStatusChanged func(source component.StatusSource, event *component.StatusEvent)
}

Expand Down
65 changes: 65 additions & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestStateString(t *testing.T) {
Expand Down Expand Up @@ -151,6 +153,69 @@ func TestCollectorReportError(t *testing.T) {
assert.Equal(t, StateClosed, col.GetState())
}

func TestComponentStatusWatcher(t *testing.T) {
factories, err := nopFactories()
assert.NoError(t, err)

// Use a processor factory that creates "unhealthy" processor: one that
// always reports StatusError after successful Start.
unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory()
factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory

// Keep track of all status changes in a map.
changedComponents := map[component.StatusSource]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) {
mux.Lock()
defer mux.Unlock()
changedComponents[source] = event.Status()
}

// Add a "statuswatcher" extension that will receive notifications when processor
// status changes.
factory := extensiontest.NewStatusWatcherExtensionFactory(onStatusChanged)
factories.Extensions[factory.Type()] = factory

// Read config from file. This config uses 3 "unhealthy" processors.
validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")}))
require.NoError(t, err)

// Create a collector
col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: validProvider,
})
require.NoError(t, err)

// Start the newly created collector.
wg := startCollector(context.Background(), t, col)

// The "unhealthy" processors will now begin to asynchronously report StatusError.
// We expect to see these reports.
assert.Eventually(t, func() bool {
mux.Lock()
defer mux.Unlock()

for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID())
// And all must be in StatusError
assert.EqualValues(t, component.StatusError, v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
// passed to status change func is unique per instance of source component despite
// components having the same IDs (having same ID for different component instances
// is a normal situation for processors).
return len(changedComponents) == 3
}, time.Second, time.Millisecond*10)

col.Shutdown()
wg.Wait()
assert.Equal(t, StateClosed, col.GetState())
}

func TestCollectorSendSignal(t *testing.T) {
factories, err := nopFactories()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion otelcol/testdata/otelcol-statuswatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ service:
pipelines:
traces:
receivers: [nop]
processors: [nop,unhealthy,unhealthy]
processors: [nop,unhealthy]
exporters: [nop]
metrics:
receivers: [nop]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"
package processortest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor"
)

// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions.
func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings {
return component.ProcessorCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
func NewUnhealthyProcessorCreateSettings() processor.CreateSettings {
return processor.CreateSettings{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type unhealthyProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewUnhealthyProcessorFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
func NewUnhealthyProcessorFactory() processor.Factory {
return processor.NewFactory(
"unhealthy",
func() component.ProcessorConfig {
return &unhealthyProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")),
}
func() component.Config {
return &struct{}{}
},
component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable),
component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable),
component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable),
processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable),
processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable),
processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable),
)
}

func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) {
func createUnhealthyTracesProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Traces) (processor.Traces, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) {
func createUnhealthyMetricsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Metrics) (processor.Metrics, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) {
func createUnhealthyLogsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Logs) (processor.Logs, error) {
return unhealthyProcessorInstance, nil
}

Expand All @@ -68,7 +63,8 @@ var unhealthyProcessorInstance = &unhealthyProcessor{

// unhealthyProcessor stores consumed traces and metrics for testing purposes.
type unhealthyProcessor struct {
nopComponent
component.StartFunc
component.ShutdownFunc
consumertest.Consumer
}

Expand Down
2 changes: 1 addition & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (host *serviceHost) ReportFatalError(err error) {
}

func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) {
host.extensions.NotifyComponentStatusChange(source, event)
host.serviceExtensions.NotifyComponentStatusChange(source, event)
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
Expand Down
60 changes: 56 additions & 4 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"go.uber.org/multierr"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
Expand All @@ -22,6 +23,8 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/pipelines"
)

Expand All @@ -45,12 +48,16 @@ type Graph struct {

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[component.ID]*pipelineNodes

// Keep track of status source per node
statusSources map[int64]*statusReportingComponent
}

func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
statusSources: make(map[int64]*statusReportingComponent),
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
Expand Down Expand Up @@ -84,12 +91,21 @@ func (g *Graph) createNodes(set Settings) error {
}
rcvrNode := g.createReceiver(pipelineID.Type(), recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
g.statusSources[rcvrNode.ID()] = &statusReportingComponent{
id: recvID,
kind: component.KindReceiver,
}
}

pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)

for _, procID := range pipelineCfg.Processors {
pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID))
procNode := g.createProcessor(pipelineID, procID)
pipe.processors = append(pipe.processors, procNode)
g.statusSources[procNode.ID()] = &statusReportingComponent{
id: procID,
kind: component.KindProcessor,
}
}

pipe.fanOutNode = newFanOutNode(pipelineID)
Expand All @@ -102,6 +118,10 @@ func (g *Graph) createNodes(set Settings) error {
}
expNode := g.createExporter(pipelineID.Type(), exprID)
pipe.exporters[expNode.ID()] = expNode
g.statusSources[expNode.ID()] = &statusReportingComponent{
id: expNode.componentID,
kind: component.KindExporter,
}
}
}

Expand Down Expand Up @@ -158,6 +178,10 @@ func (g *Graph) createNodes(set Settings) error {
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
g.statusSources[connNode.ID()] = &statusReportingComponent{
id: connNode.componentID,
kind: component.KindConnector,
}
}
}
}
Expand Down Expand Up @@ -316,7 +340,20 @@ type pipelineNodes struct {
exporters map[int64]graph.Node
}

func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
type statusReportingComponent struct {
kind component.Kind
id component.ID
}

func (s *statusReportingComponent) GetKind() component.Kind {
return s.kind
}

func (s *statusReportingComponent) ID() component.ID {
return s.id
}

func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
Expand All @@ -326,12 +363,27 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
// are started before upstream components. This ensures that each
// component's consumer is ready to consume.
for i := len(nodes) - 1; i >= 0; i-- {
comp, ok := nodes[i].(component.Component)
node := nodes[i]
comp, ok := node.(component.Component)

if !ok {
// Skip capabilities/fanout nodes
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {

statusSource, ok := g.statusSources[node.ID()]

if !ok {
// TODO: this should not happen. I'm not sure this code path will remain, but if it does
// we should ensure that we have a valid nop value for statusSource.
}

// note: there is no longer a per-component logger, hence the zap.NewNop()
// we should be able to remove the logger from components.NewHostWrapper as we deprecate
// and remove host.ReportFatalError
hostWrapper := components.NewHostWrapper(host, statusSource, zap.NewNop())

if compErr := comp.Start(ctx, hostWrapper); compErr != nil {
return compErr
}
}
Expand Down
Loading

0 comments on commit 11c42d4

Please sign in to comment.