diff --git a/.chloggen/codeboten_census-feature-gate.yaml b/.chloggen/codeboten_census-feature-gate.yaml new file mode 100644 index 00000000000..a7d16bb14f2 --- /dev/null +++ b/.chloggen/codeboten_census-feature-gate.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "add `service.disableOpenCensusBridge` feature gate which is enabled by default to remove the dependency on OpenCensus" + +# One or more tracking issues or pull requests related to the change +issues: [10414] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/profile-consumer.yaml b/.chloggen/profile-consumer.yaml new file mode 100644 index 00000000000..a515f5a7949 --- /dev/null +++ b/.chloggen/profile-consumer.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumer/consumerprofiles + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow handling profiles in consumer. + +# One or more tracking issues or pull requests related to the change +issues: [10464] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/consumer/consumer.go b/consumer/consumer.go index 503750ad7cb..64076655f20 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -5,52 +5,22 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "errors" + + "go.opentelemetry.io/collector/consumer/internal" ) // Capabilities describes the capabilities of a Processor. -type Capabilities struct { - // MutatesData is set to true if Consume* function of the - // processor modifies the input Traces, Logs or Metrics argument. - // Processors which modify the input data MUST set this flag to true. If the processor - // does not modify the data it MUST set this flag to false. If the processor creates - // a copy of the data before modifying then this flag can be safely set to false. - MutatesData bool -} - -type baseConsumer interface { - Capabilities() Capabilities -} +type Capabilities = internal.Capabilities var errNilFunc = errors.New("nil consumer func") -type baseImpl struct { - capabilities Capabilities -} - // Option to construct new consumers. -type Option func(*baseImpl) +type Option = internal.Option // WithCapabilities overrides the default GetCapabilities function for a processor. // The default GetCapabilities function returns mutable capabilities. func WithCapabilities(capabilities Capabilities) Option { - return func(o *baseImpl) { - o.capabilities = capabilities - } -} - -// Capabilities returns the capabilities of the component -func (bs baseImpl) Capabilities() Capabilities { - return bs.capabilities -} - -func newBaseImpl(options ...Option) *baseImpl { - bs := &baseImpl{ - capabilities: Capabilities{MutatesData: false}, + return func(o *internal.BaseImpl) { + o.Cap = capabilities } - - for _, op := range options { - op(bs) - } - - return bs } diff --git a/consumer/consumerprofiles/Makefile b/consumer/consumerprofiles/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/consumer/consumerprofiles/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/consumer/consumerprofiles/go.mod b/consumer/consumerprofiles/go.mod new file mode 100644 index 00000000000..7b0a06cb098 --- /dev/null +++ b/consumer/consumerprofiles/go.mod @@ -0,0 +1,35 @@ +module go.opentelemetry.io/collector/consumer/consumerprofiles + +go 1.21.0 + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile + +replace go.opentelemetry.io/collector/consumer => ../ + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/consumer v0.104.0 + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/pdata v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata diff --git a/consumer/consumerprofiles/go.sum b/consumer/consumerprofiles/go.sum new file mode 100644 index 00000000000..528166b78c0 --- /dev/null +++ b/consumer/consumerprofiles/go.sum @@ -0,0 +1,77 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/consumer/consumerprofiles/profiles.go b/consumer/consumerprofiles/profiles.go new file mode 100644 index 00000000000..7ab6b864dff --- /dev/null +++ b/consumer/consumerprofiles/profiles.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerprofiles // import "go.opentelemetry.io/collector/consumer/consumerprofiles" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/internal" + "go.opentelemetry.io/collector/pdata/pprofile" +) + +var errNilFunc = errors.New("nil consumer func") + +// Profiles is an interface that receives pprofile.Profiles, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Profiles interface { + internal.BaseConsumer + // ConsumeProfiles receives pprofile.Profiles for consumption. + ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error +} + +// ConsumeProfilesFunc is a helper function that is similar to ConsumeProfiles. +type ConsumeProfilesFunc func(ctx context.Context, td pprofile.Profiles) error + +// ConsumeProfiles calls f(ctx, td). +func (f ConsumeProfilesFunc) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error { + return f(ctx, td) +} + +type baseProfiles struct { + *internal.BaseImpl + ConsumeProfilesFunc +} + +// NewProfiles returns a Profiles configured with the provided options. +func NewProfiles(consume ConsumeProfilesFunc, options ...consumer.Option) (Profiles, error) { + if consume == nil { + return nil, errNilFunc + } + return &baseProfiles{ + BaseImpl: internal.NewBaseImpl(options...), + ConsumeProfilesFunc: consume, + }, nil +} diff --git a/consumer/consumerprofiles/profiles_test.go b/consumer/consumerprofiles/profiles_test.go new file mode 100644 index 00000000000..e50866e74da --- /dev/null +++ b/consumer/consumerprofiles/profiles_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerprofiles + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pprofile" +) + +func TestDefaultProfiles(t *testing.T) { + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return nil }) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities()) +} + +func TestNilFuncProfiles(t *testing.T) { + _, err := NewProfiles(nil) + assert.Equal(t, errNilFunc, err) +} + +func TestWithCapabilitiesProfiles(t *testing.T) { + cp, err := NewProfiles( + func(context.Context, pprofile.Profiles) error { return nil }, + consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities()) +} + +func TestConsumeProfiles(t *testing.T) { + consumeCalled := false + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { consumeCalled = true; return nil }) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) + assert.True(t, consumeCalled) +} + +func TestConsumeProfiles_ReturnError(t *testing.T) { + want := errors.New("my_error") + cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return want }) + assert.NoError(t, err) + assert.Equal(t, want, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) +} diff --git a/consumer/internal/consumer.go b/consumer/internal/consumer.go new file mode 100644 index 00000000000..1f2b5683b22 --- /dev/null +++ b/consumer/internal/consumer.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/consumer/internal" + +// Capabilities describes the capabilities of a Processor. +type Capabilities struct { + // MutatesData is set to true if Consume* function of the + // processor modifies the input Traces, Logs or Metrics argument. + // Processors which modify the input data MUST set this flag to true. If the processor + // does not modify the data it MUST set this flag to false. If the processor creates + // a copy of the data before modifying then this flag can be safely set to false. + MutatesData bool +} + +type BaseConsumer interface { + Capabilities() Capabilities +} + +type BaseImpl struct { + Cap Capabilities +} + +// Option to construct new consumers. +type Option func(*BaseImpl) + +// Capabilities returns the capabilities of the component +func (bs BaseImpl) Capabilities() Capabilities { + return bs.Cap +} + +func NewBaseImpl(options ...Option) *BaseImpl { + bs := &BaseImpl{ + Cap: Capabilities{MutatesData: false}, + } + + for _, op := range options { + op(bs) + } + + return bs +} diff --git a/consumer/logs.go b/consumer/logs.go index 5bf89a52f7a..15166ef1196 100644 --- a/consumer/logs.go +++ b/consumer/logs.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/plog" ) // Logs is an interface that receives plog.Logs, processes it // as needed, and sends it to the next processing node if any or to the destination. type Logs interface { - baseConsumer + internal.BaseConsumer // ConsumeLogs receives plog.Logs for consumption. ConsumeLogs(ctx context.Context, ld plog.Logs) error } @@ -26,7 +27,7 @@ func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld plog.Logs) error { } type baseLogs struct { - *baseImpl + *internal.BaseImpl ConsumeLogsFunc } @@ -36,7 +37,7 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) { return nil, errNilFunc } return &baseLogs{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeLogsFunc: consume, }, nil } diff --git a/consumer/metrics.go b/consumer/metrics.go index 50df60f02d0..47897f9363a 100644 --- a/consumer/metrics.go +++ b/consumer/metrics.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/pmetric" ) // Metrics is an interface that receives pmetric.Metrics, processes it // as needed, and sends it to the next processing node if any or to the destination. type Metrics interface { - baseConsumer + internal.BaseConsumer // ConsumeMetrics receives pmetric.Metrics for consumption. ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error } @@ -26,7 +27,7 @@ func (f ConsumeMetricsFunc) ConsumeMetrics(ctx context.Context, md pmetric.Metri } type baseMetrics struct { - *baseImpl + *internal.BaseImpl ConsumeMetricsFunc } @@ -36,7 +37,7 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error) return nil, errNilFunc } return &baseMetrics{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeMetricsFunc: consume, }, nil } diff --git a/consumer/traces.go b/consumer/traces.go index 56cebd53b37..60df2d04536 100644 --- a/consumer/traces.go +++ b/consumer/traces.go @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( "context" + "go.opentelemetry.io/collector/consumer/internal" "go.opentelemetry.io/collector/pdata/ptrace" ) // Traces is an interface that receives ptrace.Traces, processes it // as needed, and sends it to the next processing node if any or to the destination. type Traces interface { - baseConsumer + internal.BaseConsumer // ConsumeTraces receives ptrace.Traces for consumption. ConsumeTraces(ctx context.Context, td ptrace.Traces) error } @@ -26,7 +27,7 @@ func (f ConsumeTracesFunc) ConsumeTraces(ctx context.Context, td ptrace.Traces) } type baseTraces struct { - *baseImpl + *internal.BaseImpl ConsumeTracesFunc } @@ -36,7 +37,7 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) { return nil, errNilFunc } return &baseTraces{ - baseImpl: newBaseImpl(options...), + BaseImpl: internal.NewBaseImpl(options...), ConsumeTracesFunc: consume, }, nil } diff --git a/internal/featuregates/featuregates.go b/internal/featuregates/featuregates.go index 2cf0e081317..66b956de804 100644 --- a/internal/featuregates/featuregates.go +++ b/internal/featuregates/featuregates.go @@ -17,3 +17,8 @@ var StrictlyTypedInputGate = featuregate.GlobalRegistry().MustRegister(StrictlyT featuregate.WithRegisterFromVersion("v0.103.0"), featuregate.WithRegisterDescription("Makes type casting rules during configuration unmarshaling stricter. See https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/env-vars.md for more details."), ) + +var DisableOpenCensusBridge = featuregate.GlobalRegistry().MustRegister("service.disableOpenCensusBridge", + featuregate.StageBeta, + featuregate.WithRegisterFromVersion("v0.104.0"), + featuregate.WithRegisterDescription("`Disables the OpenCensus bridge meaning any component still using the OpenCensus SDK will no longer be able to produce telemetry.")) diff --git a/service/go.mod b/service/go.mod index 908185063fd..879f7478a3e 100644 --- a/service/go.mod +++ b/service/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/collector/extension v0.104.0 go.opentelemetry.io/collector/extension/zpagesextension v0.104.0 go.opentelemetry.io/collector/featuregate v1.11.0 + go.opentelemetry.io/collector/internal/featuregates v0.0.0-20240705161705-b127da089038 go.opentelemetry.io/collector/pdata v1.11.0 go.opentelemetry.io/collector/pdata/testdata v0.104.0 go.opentelemetry.io/collector/processor v0.104.0 @@ -85,7 +86,6 @@ require ( go.opentelemetry.io/collector/config/configtls v0.104.0 // indirect go.opentelemetry.io/collector/config/internal v0.104.0 // indirect go.opentelemetry.io/collector/extension/auth v0.104.0 // indirect - go.opentelemetry.io/collector/internal/featuregates v0.0.0-20240705161705-b127da089038 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/contrib/zpages v0.53.0 // indirect diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index 1c9ba0b63be..493515f01db 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -28,6 +28,7 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/collector/internal/featuregates" "go.opentelemetry.io/collector/processor/processorhelper" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" ) @@ -67,8 +68,10 @@ func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErro return initPullExporter(reader.Pull.Exporter, asyncErrorChannel) } if reader.Periodic != nil { - opts := []sdkmetric.PeriodicReaderOption{ - sdkmetric.WithProducer(opencensus.NewMetricProducer()), + var opts []sdkmetric.PeriodicReaderOption + + if !featuregates.DisableOpenCensusBridge.IsEnabled() { + opts = append(opts, sdkmetric.WithProducer(opencensus.NewMetricProducer())) } if reader.Periodic.Interval != nil { opts = append(opts, sdkmetric.WithInterval(time.Duration(*reader.Periodic.Interval)*time.Millisecond)) @@ -160,18 +163,22 @@ func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChann if prometheusConfig.Port == nil { return nil, nil, fmt.Errorf("port must be specified") } - exporter, err := otelprom.New( + + opts := []otelprom.Option{ otelprom.WithRegisterer(promRegistry), // https://github.com/open-telemetry/opentelemetry-collector/issues/8043 otelprom.WithoutUnits(), // Disabled for the moment until this becomes stable, and we are ready to break backwards compatibility. otelprom.WithoutScopeInfo(), - otelprom.WithProducer(opencensus.NewMetricProducer()), // This allows us to produce metrics that are backwards compatible w/ opencensus otelprom.WithoutCounterSuffixes(), otelprom.WithNamespace("otelcol"), otelprom.WithResourceAsConstantLabels(attribute.NewDenyKeysFilter()), - ) + } + if !featuregates.DisableOpenCensusBridge.IsEnabled() { + opts = append(opts, otelprom.WithProducer(opencensus.NewMetricProducer())) + } + exporter, err := otelprom.New(opts...) if err != nil { return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) } diff --git a/service/service.go b/service/service.go index 775aa7d7238..f160335d83f 100644 --- a/service/service.go +++ b/service/service.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/internal/featuregates" "go.opentelemetry.io/collector/internal/localhostgate" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/pdata/pcommon" @@ -113,6 +114,10 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { } logger.Info("Setting up own telemetry...") + + if featuregates.DisableOpenCensusBridge.IsEnabled() { + logger.Info("OpenCensus bridge is disabled for Collector telemetry and will be removed in a future version, use --feature-gates=-service.disableOpenCensusBridge to re-enable") + } mp, err := newMeterProvider( meterProviderSettings{ res: res, diff --git a/service/telemetry_test.go b/service/telemetry_test.go index 2377a077052..cb89f244a59 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -19,6 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/featuregates" "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" @@ -42,11 +44,12 @@ func TestTelemetryInit(t *testing.T) { } for _, tc := range []struct { - name string - disableHighCard bool - expectedMetrics map[string]metricValue - extendedConfig bool - cfg *telemetry.Config + name string + disableHighCard bool + disableCensusBridge bool + expectedMetrics map[string]metricValue + extendedConfig bool + cfg *telemetry.Config }{ { name: "UseOpenTelemetryForInternalMetrics", @@ -214,8 +217,55 @@ func TestTelemetryInit(t *testing.T) { }, }, }, + { + name: "DisableOpenCensusBridge", + expectedMetrics: map[string]metricValue{ + metricPrefix + otelPrefix + counterName: { + value: 13, + labels: map[string]string{ + "service_name": "otelcol", + "service_version": "latest", + "service_instance_id": testInstanceID, + }, + }, + metricPrefix + grpcPrefix + counterName: { + value: 11, + labels: map[string]string{ + "net_sock_peer_addr": "", + "net_sock_peer_name": "", + "net_sock_peer_port": "", + "service_name": "otelcol", + "service_version": "latest", + "service_instance_id": testInstanceID, + }, + }, + metricPrefix + httpPrefix + counterName: { + value: 10, + labels: map[string]string{ + "net_host_name": "", + "net_host_port": "", + "service_name": "otelcol", + "service_version": "latest", + "service_instance_id": testInstanceID, + }, + }, + "target_info": { + value: 0, + labels: map[string]string{ + "service_name": "otelcol", + "service_version": "latest", + "service_instance_id": testInstanceID, + }, + }, + }, + disableCensusBridge: true, + }, } { t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(featuregates.DisableOpenCensusBridge.ID(), tc.disableCensusBridge)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(featuregates.DisableOpenCensusBridge.ID(), true)) + }) if tc.extendedConfig { tc.cfg.Metrics.Readers = []config.MetricReader{ { diff --git a/versions.yaml b/versions.yaml index e22abc194fd..90ecb1250fd 100644 --- a/versions.yaml +++ b/versions.yaml @@ -35,6 +35,7 @@ module-sets: - go.opentelemetry.io/collector/connector - go.opentelemetry.io/collector/connector/forwardconnector - go.opentelemetry.io/collector/consumer + - go.opentelemetry.io/collector/consumer/consumerprofiles - go.opentelemetry.io/collector/exporter - go.opentelemetry.io/collector/exporter/debugexporter - go.opentelemetry.io/collector/exporter/loggingexporter