Skip to content
This repository has been archived by the owner on Jun 25, 2020. It is now read-only.

Commit

Permalink
stackdriver/telemetry_sink: implement telemetry.Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
johnsonj committed Nov 10, 2017
1 parent d314071 commit 12c43b6
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 8 deletions.
16 changes: 11 additions & 5 deletions src/stackdriver-nozzle/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type App struct {
cfConfig *cfclient.Config
cfClient *cfclient.Client
labelMaker nozzle.LabelMaker
reporter telemetry.Reporter
bufferEmpty func() bool
}

Expand All @@ -49,16 +48,12 @@ func New(c *config.Config, logger lager.Logger) *App {
}
labelMaker := nozzle.NewLabelMaker(appInfoRepository, c.BoshDirectorName)

logSink := telemetry.NewLogSink(logger)
reporter := telemetry.NewReporter(time.Duration(c.HeartbeatRate)*time.Second, logSink)

return &App{
logger: logger,
c: c,
cfConfig: cfConfig,
cfClient: cfClient,
labelMaker: labelMaker,
reporter: reporter,
}
}

Expand Down Expand Up @@ -133,3 +128,14 @@ func (a *App) newMetricSink(ctx context.Context, metricAdapter stackdriver.Metri

return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, nozzle.NewUnitParser())
}

func (a *App) newTelemetryReporter() telemetry.Reporter {
metricClient, err := stackdriver.NewMetricClient()
if err != nil {
a.logger.Fatal("metricClient", err)
}

logSink := telemetry.NewLogSink(a.logger)
metricSink := stackdriver.NewTelemetrySink(a.logger, metricClient, a.c.ProjectID, a.c.MetricPathPrefix, a.c.SubscriptionID, a.c.BoshDirectorName)
return telemetry.NewReporter(time.Duration(a.c.HeartbeatRate)*time.Second, logSink, metricSink)
}
3 changes: 2 additions & 1 deletion src/stackdriver-nozzle/app/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func Run(ctx context.Context, a *App) {
})
}()
}
a.reporter.Start(ctx)
reporter := a.newTelemetryReporter()
reporter.Start(ctx)

producer := a.newProducer()
consumer, err := a.newConsumer(ctx)
Expand Down
7 changes: 6 additions & 1 deletion src/stackdriver-nozzle/mocks/metric_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type MockClient struct {
DescriptorReqs []*monitoringpb.CreateMetricDescriptorRequest
ListErr error

CreateMetricDescriptorFn func(request *monitoringpb.CreateMetricDescriptorRequest) error
CreateMetricDescriptorFn func(req *monitoringpb.CreateMetricDescriptorRequest) error
ListMetricDescriptorFn func(request *monitoringpb.ListMetricDescriptorsRequest) ([]*metricpb.MetricDescriptor, error)
PostFn func(req *monitoringpb.CreateTimeSeriesRequest) error
}

Expand Down Expand Up @@ -44,6 +45,10 @@ func (mc *MockClient) CreateMetricDescriptor(request *monitoringpb.CreateMetricD
}

func (mc *MockClient) ListMetricDescriptors(request *monitoringpb.ListMetricDescriptorsRequest) ([]*metricpb.MetricDescriptor, error) {
if mc.ListMetricDescriptorFn != nil {
return mc.ListMetricDescriptorFn(request)
}

if mc.ListErr != nil {
return nil, mc.ListErr
}
Expand Down
2 changes: 1 addition & 1 deletion src/stackdriver-nozzle/stackdriver/metric_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (ma *metricAdapter) CreateMetricDescriptor(metric *messages.Metric, labels
func (ma *metricAdapter) fetchMetricDescriptorNames() error {
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: fmt.Sprintf("projects/%s", ma.projectID),
Filter: "metric.type = starts_with(\"custom.googleapis.com/\")",
Filter: `metric.type = starts_with("custom.googleapis.com/")`,
}

descriptors, err := ma.client.ListMetricDescriptors(req)
Expand Down
171 changes: 171 additions & 0 deletions src/stackdriver-nozzle/stackdriver/telemetry_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,174 @@
*/

package stackdriver

import (
"expvar"
"fmt"
"time"

"cloud.google.com/go/compute/metadata"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry"
"github.com/cloudfoundry/lager"
"github.com/golang/protobuf/ptypes/timestamp"
labelpb "google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
"google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

type telemetrySink struct {
projectPath string
metricPrefix string
labels map[string]string
resource *monitoredres.MonitoredResource
logger lager.Logger
client MetricClient
startTime *timestamp.Timestamp
}

func now() *timestamp.Timestamp {
now := time.Now()
return &timestamp.Timestamp{
Seconds: now.Unix(),
Nanos: int32(now.Nanosecond()),
}
}

func detectMonitoredResource() (res *monitoredres.MonitoredResource) {
res = &monitoredres.MonitoredResource{Type: "global"}

if metadata.OnGCE() {
projectId, err := metadata.ProjectID()
if err != nil {
return
}
instanceId, err := metadata.InstanceID()
if err != nil {
return
}
zone, err := metadata.Zone()
if err != nil {
return
}

res.Type = "gce_instance"
res.Labels = map[string]string{"project_id": projectId, "instance_id": instanceId, "zone": zone}
}
return
}

// NewTelemetrySink provides a telemetry.Sink that writes metrics to Stackdriver Monitoring
func NewTelemetrySink(logger lager.Logger, client MetricClient, projectID, metricPrefix, subscriptionId, director string) telemetry.Sink {
return &telemetrySink{
logger: logger,
client: client,
projectPath: fmt.Sprintf("projects/%s", projectID),
metricPrefix: metricPrefix,
labels: map[string]string{"subscription_id": subscriptionId, "director": director},
startTime: now(),
resource: detectMonitoredResource()}
}

func (ts *telemetrySink) Init(registeredSeries []*expvar.KeyValue) {
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: ts.projectPath,
Filter: fmt.Sprintf(`metric.type = starts_with("%s")`, ts.metricPrefix),
}

descriptors, err := ts.client.ListMetricDescriptors(req)
if err != nil {
ts.logger.Error("telemetrySink.ListMetricDescriptors", err, lager.Data{"req": req})
}

registered := map[string]bool{}
for _, descriptor := range descriptors {
registered[descriptor.Name] = true
}

labels := []*labelpb.LabelDescriptor{}
for name := range ts.labels {
labels = append(labels, &labelpb.LabelDescriptor{Key: name, ValueType: labelpb.LabelDescriptor_STRING})
}

for _, series := range registeredSeries {
name := ts.metricDescriptorName(series.Key)
if registered[name] {
continue
}

req := &monitoringpb.CreateMetricDescriptorRequest{
Name: ts.projectPath,
MetricDescriptor: &metricpb.MetricDescriptor{
DisplayName: ts.metricDescriptorDisplayName(series.Key),
Name: name,
Type: ts.metricDescriptorType(series.Key),
Labels: labels,
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
ValueType: metricpb.MetricDescriptor_INT64,
Description: "stackdriver-nozzle created custom metric.",
},
}
err := ts.client.CreateMetricDescriptor(req)

if err != nil {
ts.logger.Error("telemetrySink.CreateMetricDescriptor", err, lager.Data{"req": req})
}
}
}

func (ts *telemetrySink) metricDescriptorDisplayName(key string) string {
return fmt.Sprintf("%s/stackdriver-nozzle/%s", ts.metricPrefix, key)
}

func (ts *telemetrySink) metricDescriptorName(key string) string {
return fmt.Sprintf("%s/metricDescriptors/%s", ts.projectPath, ts.metricDescriptorType(key))
}

func (ts *telemetrySink) metricDescriptorType(key string) string {
return fmt.Sprintf("custom.googleapis.com/%s/stackdriver-nozzle/%s", ts.metricPrefix, key)
}

const maxTimeSeries = 200

func (ts *telemetrySink) newRequest() *monitoringpb.CreateTimeSeriesRequest {
return &monitoringpb.CreateTimeSeriesRequest{
Name: ts.projectPath,
}
}

func (ts *telemetrySink) Report(report []*expvar.KeyValue) {
req := ts.newRequest()

timeInterval := &monitoringpb.TimeInterval{
EndTime: now(),
StartTime: ts.startTime,
}

for _, data := range report {
if val, ok := data.Value.(*expvar.Int); ok {
req.TimeSeries = append(req.TimeSeries, &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: ts.metricDescriptorType(data.Key),
Labels: ts.labels,
},
Points: []*monitoringpb.Point{{
Interval: timeInterval,
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{Int64Value: val.Value()},
},
}},
Resource: ts.resource,
})
}

if len(req.TimeSeries) == maxTimeSeries {
ts.client.Post(req)
req = ts.newRequest()
}
}

if len(req.TimeSeries) != 0 {
ts.client.Post(req)
}
}
127 changes: 127 additions & 0 deletions src/stackdriver-nozzle/stackdriver/telemetry_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package stackdriver_test

import (
"expvar"

"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/mocks"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/stackdriver"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"fmt"

labelpb "google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

const (
projectID = "myproject"
projectPath = "projects/" + projectID
metricPrefix = "firehose"
subscriptionID = "sdnozzle"
director = "bosh"
)

var _ = Describe("TelemetrySink", func() {
var (
logger *mocks.MockLogger
sink telemetry.Sink
client *mocks.MockClient
)
BeforeEach(func() {
logger = &mocks.MockLogger{}
client = &mocks.MockClient{}
sink = stackdriver.NewTelemetrySink(logger, client, projectID, metricPrefix, subscriptionID, director)
})

Context("Init with existing MetricDescriptors", func() {
oldData := &expvar.KeyValue{Key: "old", Value: &expvar.Int{}}
newData := &expvar.KeyValue{Key: "new", Value: &expvar.Int{}}

BeforeEach(func() {
client.ListMetricDescriptorFn = func(request *monitoringpb.ListMetricDescriptorsRequest) ([]*metricpb.MetricDescriptor, error) {
return []*metricpb.MetricDescriptor{
{Name: projectPath + "/metricDescriptors/custom.googleapis.com/" + metricPrefix + "/stackdriver-nozzle/" + oldData.Key},
}, nil
}

sink.Init([]*expvar.KeyValue{oldData, newData})
})

It("only creates new metric descriptors", func() {
Expect(client.DescriptorReqs).To(HaveLen(1))

req := client.DescriptorReqs[0]
Expect(req.Name).To(Equal(projectPath))
descriptor := req.MetricDescriptor

displayName := metricPrefix + "/stackdriver-nozzle/" + newData.Key
metricType := "custom.googleapis.com/" + displayName
name := projectPath + "/metricDescriptors/" + metricType

Expect(descriptor.Name).To(Equal(name))
Expect(descriptor.Type).To(Equal(metricType))
Expect(descriptor.DisplayName).To(Equal(displayName))
Expect(descriptor.MetricKind).To(Equal(metricpb.MetricDescriptor_CUMULATIVE))
Expect(descriptor.ValueType).To(Equal(metricpb.MetricDescriptor_INT64))

labels := descriptor.Labels
Expect(labels).To(HaveLen(2))
Expect(labels).To(ContainElement(&labelpb.LabelDescriptor{Key: "director", ValueType: labelpb.LabelDescriptor_STRING}))
Expect(labels).To(ContainElement(&labelpb.LabelDescriptor{Key: "subscription_id", ValueType: labelpb.LabelDescriptor_STRING}))
})
})

Context("Report", func() {
value := &expvar.Int{}
keyValue := &expvar.KeyValue{Key: "foo", Value: value}
BeforeEach(func() {
value.Set(1234)
sink.Report([]*expvar.KeyValue{keyValue})
})

It("posts TimeSeries", func() {
Expect(client.MetricReqs).To(HaveLen(1))

req := client.MetricReqs[0]
Expect(req.Name).To(Equal(projectPath))
Expect(req.TimeSeries).To(HaveLen(1))

series := req.TimeSeries[0]
Expect(series.Resource).NotTo(BeNil())

metric := series.Metric
Expect(metric.Type).To(Equal("custom.googleapis.com/" + metricPrefix + "/stackdriver-nozzle/" + keyValue.Key))

labels := metric.Labels
Expect(labels).To(HaveLen(2))
Expect(labels).To(HaveKeyWithValue("director", director))
Expect(labels).To(HaveKeyWithValue("subscription_id", subscriptionID))

Expect(series.Points).To(HaveLen(1))
point := series.Points[0]
Expect(point.Value.Value.(*monitoringpb.TypedValue_Int64Value).Int64Value).To(Equal(value.Value()))
})
})

Context("with many metrics", func() {
values := []*expvar.KeyValue{}
BeforeEach(func() {
for i := 0; i < 300; i++ {
value := &expvar.Int{}
value.Set(int64(i))
values = append(values, &expvar.KeyValue{Key: fmt.Sprintf("foo%d", i), Value: value})
}

sink.Report(values)
})

It("batches requests to Stackdriver", func() {
Expect(client.MetricReqs).To(HaveLen(2))
Expect(client.MetricReqs[0].TimeSeries).To(HaveLen(200))
Expect(client.MetricReqs[1].TimeSeries).To(HaveLen(100))
})
})
})

0 comments on commit 12c43b6

Please sign in to comment.