Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics exporter cache requests that fail with an unrecoverable error #768

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type MetricsExporter struct {
mdCache map[string]*monitoringpb.CreateMetricDescriptorRequest
// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest
client *monitoring.MetricClient
client monitoringClient
// Only used for testing purposes in lieu of initializing a fake client
exportFunc func(context.Context, *monitoringpb.CreateTimeSeriesRequest) error
// requestOpts applies options to the context for requests, such as additional headers.
Expand Down Expand Up @@ -137,6 +137,15 @@ const (

type labels map[string]string

// monitoringClient is the subset of monitoring.MetricClient this exporter uses,
// and allows us to mock the implementation for testing.
type monitoringClient interface {
avilevy18 marked this conversation as resolved.
Show resolved Hide resolved
CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
CreateServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
Close() error
CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error)
}

func (me *MetricsExporter) Shutdown(ctx context.Context) error {
// TODO: pass ctx to goroutines so that we can use its deadline
close(me.shutdownC)
Expand Down Expand Up @@ -471,8 +480,7 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
s := status.Convert(err)
if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) {
if isNotRecoverable(err) {
break
}
me.obs.log.Error("retryable error, retrying request")
Expand Down Expand Up @@ -655,6 +663,12 @@ func projectName(projectID string) string {
return fmt.Sprintf("projects/%s", projectID)
}

// isNotRecoverable returns true if the error is permanent.
func isNotRecoverable(err error) bool {
avilevy18 marked this conversation as resolved.
Show resolved Hide resolved
s := status.Convert(err)
return !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable)
}

// Helper method to send metric descriptors to GCM.
func (me *MetricsExporter) exportMetricDescriptor(req *monitoringpb.CreateMetricDescriptorRequest) {
cacheKey := fmt.Sprintf("%s/%s", req.Name, req.MetricDescriptor.Type)
Expand All @@ -669,12 +683,16 @@ func (me *MetricsExporter) exportMetricDescriptor(req *monitoringpb.CreateMetric
}
_, err := me.client.CreateMetricDescriptor(ctx, req)
if err != nil {
if isNotRecoverable(err) {
// cache if the error is non-recoverable
me.mdCache[cacheKey] = req
}
// TODO: Log-once on error, per metric descriptor?
me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", req.MetricDescriptor))
return
}

// only cache if we are successful. We want to retry if there is an error
// cache if we are successful
me.mdCache[cacheKey] = req
}

Expand Down
116 changes: 116 additions & 0 deletions exporter/collector/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (

"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/wal"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
Expand Down Expand Up @@ -58,6 +60,120 @@ func newTestMetricMapper() (metricMapper, func()) {
}, func() { close(s) }
}

type mock struct {
monitoringClient
createMetricDescriptor func(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error)
}

func (m *mock) CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error) {
return m.createMetricDescriptor(ctx, req)
}

func TestExportCreateMetricDescriptorCache(t *testing.T) {
for _, tc := range []struct {
reqs []*monitoringpb.CreateMetricDescriptorRequest
desc string
createMetricDescriptorResponses []error
expectedTimesRequestCalled int
expectedTimesZapCalled int
}{
{
expectedTimesRequestCalled: 2,
expectedTimesZapCalled: 0,
desc: "valid metric descriptor gets created",
createMetricDescriptorResponses: []error{nil, nil},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "bar",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "baz",
},
},
},
},
{
expectedTimesRequestCalled: 1,
expectedTimesZapCalled: 1,
desc: "non-recoverable error",
createMetricDescriptorResponses: []error{
status.Error(codes.PermissionDenied, "permission denied"),
status.Error(codes.PermissionDenied, "permission denied"),
},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
},
},
{
expectedTimesRequestCalled: 2,
expectedTimesZapCalled: 1,
desc: "recoverable error",
createMetricDescriptorResponses: []error{
status.Error(codes.DeadlineExceeded, "deadline exceeded"),
nil,
},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
},
},
} {
logger, observed := observer.New(zap.DebugLevel)

actualTimesCalled := 0
i := 0
m := &mock{
createMetricDescriptor: func(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error) {
actualTimesCalled++
err := tc.createMetricDescriptorResponses[i]
i++
return req.MetricDescriptor, err
},
}

me := MetricsExporter{
mdCache: make(map[string]*monitoringpb.CreateMetricDescriptorRequest),
obs: selfObservability{
log: zap.New(logger),
},
client: m,
}

for _, r := range tc.reqs {
me.exportMetricDescriptor(r)
}

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), tc.expectedTimesZapCalled)
require.Equal(t, tc.expectedTimesRequestCalled, actualTimesCalled)
}
}

func TestMetricToTimeSeries(t *testing.T) {
mr := &monitoredrespb.MonitoredResource{}

Expand Down