Skip to content

Commit

Permalink
Resource Detection processor (#309)
Browse files Browse the repository at this point in the history
* Add resourcedetection processor

* Add env and gce resource detectors

* Resource Detection Processor: increase test coverage & fix README

* Cache detected resource against processor name to avoid running resource detection code more than once

* Refactor resources detection processor to use resource provider
  • Loading branch information
james-bebbington authored Jun 30, 2020
1 parent 9905ab0 commit dfcd49a
Show file tree
Hide file tree
Showing 23 changed files with 3,277 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerlegacyreceiver"
Expand Down Expand Up @@ -118,6 +119,7 @@ func components() (config.Factories, error) {

processors := []component.ProcessorFactoryBase{
&k8sprocessor.Factory{},
resourcedetectionprocessor.NewFactory(),
}
for _, pr := range factories.Processors {
processors = append(processors, pr)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerlegacyreceiver v0.0.0
Expand Down Expand Up @@ -109,4 +110,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipki

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor => ./processor/k8sprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor => ./processor/resourcedetectionprocessor/

replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
1 change: 1 addition & 0 deletions processor/resourcedetectionprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
29 changes: 29 additions & 0 deletions processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Resource Detection Processor

Supported pipeline types: metrics, traces

The resource detection processor can be used to detect resource information from the host,
in a format that conforms to the OpenTelemetry resource semantic conventions, and append or
override the resource value in traces and metrics with this information.

Currently supported detectors include:

* Environment Variable: Reads resource information from the `OTEL_RESOURCE` environment
variable. This is expected to be in the format `<key1>=<value1>,<key2>=<value2>,...`, the
details of which are currently pending confirmation in the OpenTelemetry specification.

* GCE Metadata: Uses the [Google Cloud Client Libraries for Go](https://github.com/googleapis/google-cloud-go)
to read resource information from the GCE metadata server as documented by
https://cloud.google.com/compute/docs/storing-retrieving-metadata

## Configuration

```yaml
# a list of resource detectors to run, valid options are: "env", "gce"
detectors: [ <string> ]
# determines if existing resource attributes should be overridden or preserved, defaults to true
override: <bool>
```
The full list of settings exposed for this extension are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
35 changes: 35 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcedetectionprocessor

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// Detectors is an ordered list of named detectors that should be
// run to attempt to detect resource information.
Detectors []string `mapstructure:"detectors"`
// Timeout specifies the maximum amount of time that we will wait
// before assuming a detector has failed. Defaults to 5s.
Timeout time.Duration `mapstructure:"timeout"`
// Override indicates whether any existing resource attributes
// should be overridden or preserved. Defaults to true.
Override bool `mapstructure:"override"`
}
51 changes: 51 additions & 0 deletions processor/resourcedetectionprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcedetectionprocessor

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factories.Processors[typeStr] = &Factory{}

cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
assert.NotNil(t, cfg)

p1 := cfg.Processors["resourcedetection"]
assert.Equal(t, p1, factory.CreateDefaultConfig())

p2 := cfg.Processors["resourcedetection/2"]
assert.Equal(t, p2, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resourcedetection",
NameVal: "resourcedetection/2",
},
Detectors: []string{"env", "gce"},
Timeout: 2 * time.Second,
Override: false,
})
}
17 changes: 17 additions & 0 deletions processor/resourcedetectionprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// package resourcedetectionprocessor implements a processor for specifying resource
// labels to be added to OpenCensus trace data and metrics data.
package resourcedetectionprocessor
139 changes: 139 additions & 0 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcedetectionprocessor

import (
"context"
"strings"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/env"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/gcp/gce"
)

const (
// The value of "type" key in configuration.
typeStr = "resourcedetection"
)

// Factory is the factory for resourcedetection processor.
type Factory struct {
resourceProviderFactory *internal.ResourceProviderFactory

// providers stores a provider for each named processor that
// may a different set of detectors configured.
providers map[string]*internal.ResourceProvider
lock sync.Mutex
}

// NewFactory creates a new factory for resourcedetection processor.
func NewFactory() *Factory {
resourceProviderFactory := internal.NewProviderFactory(map[internal.DetectorType]internal.Detector{
env.TypeStr: &env.Detector{},
gce.TypeStr: gce.NewDetector(),
})

return &Factory{
resourceProviderFactory: resourceProviderFactory,
providers: map[string]*internal.ResourceProvider{},
}
}

// Type gets the type of the Option config created by this factory.
func (*Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (*Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Detectors: []string{env.TypeStr},
Timeout: 5 * time.Second,
Override: true,
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (component.TraceProcessor, error) {
oCfg := cfg.(*Config)

provider, err := f.getResourceProvider(ctx, params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
if err != nil {
return nil, err
}

return newResourceTraceProcessor(ctx, nextConsumer, provider, oCfg.Override), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

provider, err := f.getResourceProvider(ctx, params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
if err != nil {
return nil, err
}

return newResourceMetricProcessor(ctx, nextConsumer, provider, oCfg.Override), nil
}

func (f *Factory) getResourceProvider(
ctx context.Context,
logger *zap.Logger,
processorName string,
timeout time.Duration,
configuredDetectors []string,
) (*internal.ResourceProvider, error) {
f.lock.Lock()
defer f.lock.Unlock()

if provider, ok := f.providers[processorName]; ok {
return provider, nil
}

detectorTypes := make([]internal.DetectorType, 0, len(configuredDetectors))
for _, key := range configuredDetectors {
detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key)))
}

provider, err := f.resourceProviderFactory.CreateResourceProvider(logger, timeout, detectorTypes...)
if err != nil {
return nil, err
}

f.providers[processorName] = provider
return provider, nil
}
44 changes: 44 additions & 0 deletions processor/resourcedetectionprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcedetectionprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
)

func TestCreateDefaultConfig(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.NotNil(t, cfg)
}

func TestCreateProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)

mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}
13 changes: 13 additions & 0 deletions processor/resourcedetectionprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor

go 1.14

require (
cloud.google.com/go v0.45.1
github.com/census-instrumentation/opencensus-proto v0.2.1
github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e // indirect
github.com/stretchr/testify v1.5.1
go.opentelemetry.io/collector v0.4.1-0.20200622191610-a8db6271f90a
go.uber.org/atomic v1.5.1
go.uber.org/zap v1.13.0
)
Loading

0 comments on commit dfcd49a

Please sign in to comment.