Skip to content

Commit

Permalink
First draft for labels and filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Apr 28, 2023
1 parent 1e874fd commit fd2d554
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 64 deletions.
33 changes: 23 additions & 10 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/url"
"time"

"github.com/coredns/coredns/plugin/pkg/log"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
Expand Down Expand Up @@ -45,7 +44,15 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
// This step should not block server initialization, so errors are logged, but not returned.
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) *telemetry.OTELSink {
ctx := context.Background()
url, err := verifyCCMRegistration(ctx, hcpClient)
telemetryCfg, err := fetchTelemetryConfig(ctx, hcpClient, logger)
if err != nil {
return nil
}

url, err := verifyCCMRegistration(telemetryCfg, logger)
if err != nil {
return nil
}

// if endpoint is empty, no metrics endpoint configuration for this Consul server
// (e.g. not registered with CCM or feature flag to control rollout) so do not enable the HCP metrics sink.
Expand All @@ -60,8 +67,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
}

sinkOpts := &telemetry.OTELSinkOpts{
Logger: logger,
Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second),
Logger: logger,
Labels: telemetryCfg.Labels,
Filters: telemetryCfg.MetricsConfig.Filters,
Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second),
}

sink, err := telemetry.NewOTELSink(sinkOpts)
Expand All @@ -73,18 +82,22 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
return sink
}

// verifyCCMRegistration checks that a server is registered with the HCP management plane
// by making a HTTP request to the HCP TelemetryConfig endpoint.
// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string, error) {
func fetchTelemetryConfig(ctx context.Context, client hcpclient.Client, logger hclog.Logger) (*hcpclient.TelemetryConfig, error) {
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := client.FetchTelemetryConfig(reqCtx)
if err != nil {
return "", fmt.Errorf("failed to fetch telemetry config %w", err)
return nil, fmt.Errorf("failed to fetch telemetry config %w", err)
}

return telemetryCfg, nil
}

// verifyCCMRegistration checks that a server is registered with the HCP management plane
// by making a HTTP request to the HCP TelemetryConfig endpoint.
// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
func verifyCCMRegistration(telemetryCfg *hcpclient.TelemetryConfig, logger hclog.Logger) (string, error) {
endpoint := telemetryCfg.Endpoint
if override := telemetryCfg.MetricsConfig.Endpoint; override != "" {
endpoint = override
Expand All @@ -98,7 +111,7 @@ func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string
// The endpoint from the HCP gateway is a domain without scheme, and without the metrics path, so they must be added.
url, err := url.Parse(fmt.Sprintf("https://%s/v1/metrics", endpoint))
if err != nil {
log.Error("failed to parse url: %w", err)
logger.Error("failed to parse url: %w", err)
return "", fmt.Errorf("failed to parse url: %w", err)
}

Expand Down
69 changes: 25 additions & 44 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package hcp

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -74,69 +73,51 @@ func TestSink(t *testing.T) {

func TestVerifyCCMRegistration(t *testing.T) {
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedURL string
telemetryCfg *client.TelemetryConfig
wantErr string
expectedURL string
}{
"failsWithFetchTelemetryFailure": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error"))
},
wantErr: "failed to fetch telemetry config",
},
"failsWithURLParseErr": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
},
wantErr: "failed to parse url:",
},
"noErrWithEmptyEndpoint": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
telemetryCfg: &client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
},
expectedURL: "",
},
"success": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
},
expectedURL: "https://test.com/v1/metrics",
},
"successMetricsEndpointOverride": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "override.com",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "override.com",
},
},
expectedURL: "https://override.com/v1/metrics",
},
} {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
mClient := client.NewMockClient(t)
test.expect(mClient)

url, err := verifyCCMRegistration(ctx, mClient)
url, err := verifyCCMRegistration(test.telemetryCfg, hclog.NewNullLogger())
if test.wantErr != "" {
require.Empty(t, url)
require.Error(t, err)
Expand Down
41 changes: 41 additions & 0 deletions agent/hcp/telemetry/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package telemetry

import (
"fmt"
"regexp"

"github.com/hashicorp/go-multierror"
)

// FilterList holds a map of filters, i.e. regular expressions.
// These filters are used to identify which Consul metrics can be transmitted to HCP.
type FilterList struct {
filters map[string]*regexp.Regexp
}

// NewFilterList returns a FilterList which holds valid regex
// used to filter metrics. It will not fail if invalid REGEX is given, but returns a list of errors.
func NewFilterList(filters []string) (*FilterList, error) {
var err error
f := &FilterList{}
compiledList := make(map[string]*regexp.Regexp, len(filters))
for idx, filter := range filters {
re, err := regexp.Compile(filter)
if err != nil {
multierror.Append(err, fmt.Errorf("compilation of filter at index %d failed: %w", idx, err))
}
compiledList[filter] = re
}
f.filters = compiledList
return f, err
}

// Match returns true if the metric name matches a REGEX in the allowed metric filters.
func (fl *FilterList) Match(name string) bool {
for _, re := range fl.filters {
if re.Match([]byte(name)) {
return true
}
}
return false
}
39 changes: 39 additions & 0 deletions agent/hcp/telemetry/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package telemetry

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFilter(t *testing.T) {
for name, tc := range map[string]struct {
filters []string
wantMatch bool
wantErr string
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
wantErr: "compilation of filter at index 0 failed",
},
"matchFound": {
filters: []string{"raft.*"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.heap_size"},
wantMatch: false,
},
} {
t.Run(name, func(t *testing.T) {
f, err := NewFilterList(tc.filters)
if tc.wantErr != "" {
require.Contains(t, err.Error(), tc.wantErr)

} else {
m := f.Match("consul.raft.peers")
require.Equal(t, tc.wantMatch, m)
}
})
}
}
51 changes: 41 additions & 10 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ import (
)

type OTELSinkOpts struct {
Reader otelsdk.Reader
Logger hclog.Logger
Reader otelsdk.Reader
Logger hclog.Logger
Ctx context.Context
Filters []string
Labels map[string]string
}

type OTELSink struct {
spaceReplacer *strings.Replacer
logger hclog.Logger
ctx context.Context
filters *FilterList

meterProvider *otelsdk.MeterProvider
meter *otelmetric.Meter
Expand All @@ -41,21 +46,33 @@ type OTELSink struct {
}

func NewOTELReader(client client.MetricsClient, endpoint string, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, endpoint)
exporter := &OTELExporter{
client: client,
endpoint: endpoint,
}
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
}

func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
if opts.Logger == nil {
return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts Logger")
if opts.Logger == nil || opts.Reader == nil || opts.Ctx == nil {
return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts")
}

attrs := make([]attribute.KeyValue, len(opts.Labels))
for k, v := range opts.Labels {
attrs = append(attrs, attribute.KeyValue{
Key: attribute.Key(k),
Value: attribute.StringValue(v),
})
}

if opts.Reader == nil {
return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts Reader")
filterList, err := NewFilterList(opts.Filters)
if err != nil {
opts.Logger.Error("Failed to initialize all filters: %w", err)
}

// Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically.
res := resource.NewSchemaless()
res := resource.NewWithAttributes("", attrs...)
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader))
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry")

Expand All @@ -65,8 +82,10 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
}

return &OTELSink{
filters: filterList,
spaceReplacer: strings.NewReplacer(" ", "_"),
logger: opts.Logger.Named("otel_sink"),
ctx: opts.Ctx,
meterProvider: meterProvider,
meter: &meter,
mutex: sync.Mutex{},
Expand Down Expand Up @@ -97,6 +116,10 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

// Set value in global Gauge store.
o.gaugeStore.Store(k, float64(val), toAttributes(labels))

Expand All @@ -118,6 +141,10 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

o.mutex.Lock()
defer o.mutex.Unlock()

Expand All @@ -133,13 +160,17 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet
}

attrs := toAttributes(labels)
(*inst).Record(context.TODO(), float64(val), attrs...)
(*inst).Record(o.ctx, float64(val), attrs...)
}

// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

o.mutex.Lock()
defer o.mutex.Unlock()

Expand All @@ -156,7 +187,7 @@ func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gom
}

attrs := toAttributes(labels)
(*inst).Add(context.TODO(), float64(val), attrs...)
(*inst).Add(o.ctx, float64(val), attrs...)
}

// EmitKey unsupported.
Expand Down

0 comments on commit fd2d554

Please sign in to comment.