Skip to content

Commit

Permalink
[chore] Move capabilities consumers in a separate package
Browse files Browse the repository at this point in the history
This is to prepare to move "pipelines" as public, see open-telemetry#5564

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 29, 2022
1 parent ae077be commit f3fde50
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines // import "go.opentelemetry.io/collector/service/internal/pipelines"
package capabilityconsumer // import "go.opentelemetry.io/collector/service/internal/capabilityconsumer"

import (
"go.opentelemetry.io/collector/consumer"
)

func wrapLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs {
func NewLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs {
return capLogs{Logs: logs, cap: cap}
}

Expand All @@ -31,7 +31,7 @@ func (mts capLogs) Capabilities() consumer.Capabilities {
return mts.cap
}

func wrapMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics {
func NewMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics {
return capMetrics{Metrics: metrics, cap: cap}
}

Expand All @@ -44,7 +44,7 @@ func (mts capMetrics) Capabilities() consumer.Capabilities {
return mts.cap
}

func wrapTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces {
func NewTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces {
return capTraces{Traces: traces, cap: cap}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines
package capabilityconsumer

import (
"context"
Expand All @@ -26,35 +26,35 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestWrapLogs(t *testing.T) {
func TestLogs(t *testing.T) {
sink := &consumertest.LogsSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapLogs(sink, consumer.Capabilities{MutatesData: true})
wrap := NewLogs(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeLogs(context.Background(), testdata.GenerateLogs(1)))
assert.Len(t, sink.AllLogs(), 1)
assert.Equal(t, testdata.GenerateLogs(1), sink.AllLogs()[0])
}

func TestWrapMetrics(t *testing.T) {
func TestMetrics(t *testing.T) {
sink := &consumertest.MetricsSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapMetrics(sink, consumer.Capabilities{MutatesData: true})
wrap := NewMetrics(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1)))
assert.Len(t, sink.AllMetrics(), 1)
assert.Equal(t, testdata.GenerateMetrics(1), sink.AllMetrics()[0])
}

func TestWrapTraces(t *testing.T) {
func TestTraces(t *testing.T) {
sink := &consumertest.TracesSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapTraces(sink, consumer.Capabilities{MutatesData: true})
wrap := NewTraces(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeTraces(context.Background(), testdata.GenerateTraces(1)))
Expand Down
7 changes: 4 additions & 3 deletions service/internal/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
"go.opentelemetry.io/collector/service/internal/zpages"
Expand Down Expand Up @@ -282,11 +283,11 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) {
// Because of this wrap the first consumer if any consumers in the pipeline mutate the data and the first says that it doesn't.
switch pipelineID.Type() {
case component.DataTypeTraces:
bp.lastConsumer = capTraces{Traces: bp.lastConsumer.(consumer.Traces), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
bp.lastConsumer = capabilityconsumer.NewTraces(bp.lastConsumer.(consumer.Traces), consumer.Capabilities{MutatesData: mutatesConsumedData})
case component.DataTypeMetrics:
bp.lastConsumer = capMetrics{Metrics: bp.lastConsumer.(consumer.Metrics), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
bp.lastConsumer = capabilityconsumer.NewMetrics(bp.lastConsumer.(consumer.Metrics), consumer.Capabilities{MutatesData: mutatesConsumedData})
case component.DataTypeLogs:
bp.lastConsumer = capLogs{Logs: bp.lastConsumer.(consumer.Logs), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
bp.lastConsumer = capabilityconsumer.NewLogs(bp.lastConsumer.(consumer.Logs), consumer.Capabilities{MutatesData: mutatesConsumedData})
default:
return nil, fmt.Errorf("create cap consumer in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type())
}
Expand Down

0 comments on commit f3fde50

Please sign in to comment.