Skip to content

Commit

Permalink
Send timeseries synchronously (#358)
Browse files Browse the repository at this point in the history
* send timeseries syncronously

* update docs, and fix existing broken test
  • Loading branch information
dashpole authored Apr 15, 2022
1 parent da95bb3 commit e3aea42
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 90 deletions.
2 changes: 1 addition & 1 deletion exporter/collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ These instructions are to get you up and running quickly with the GCP exporter i
exporters: [googlecloud, logging]
metrics:
receivers: [otlp]
processors: [memory_limiter]
processors: [memory_limiter, batch]
exporters: [googlecloud, logging]
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2620,7 +2620,7 @@
}
},
{
"name":"telemetry_scrape_duration_seconds",
"name":"other_telemetry_scrape_duration_seconds",
"description":"Scrape duration",
"unit":"s",
"summary":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@
},
{
"metric": {
"type": "workload.googleapis.com/telemetry_scrape_duration_seconds_sum",
"type": "workload.googleapis.com/other_telemetry_scrape_duration_seconds_sum",
"labels": {
"content_type": "text/plain; version=0.0.4",
"job": "default/rabbitmq/0",
Expand Down Expand Up @@ -3587,7 +3587,7 @@
},
{
"metric": {
"type": "workload.googleapis.com/telemetry_scrape_duration_seconds_count",
"type": "workload.googleapis.com/other_telemetry_scrape_duration_seconds_count",
"labels": {
"content_type": "text/plain; version=0.0.4",
"job": "default/rabbitmq/0",
Expand Down
112 changes: 26 additions & 86 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ type MetricsExporter struct {
metricDescriptorC chan *metricpb.MetricDescriptor
// Tracks the metric descriptors that have already been sent to GCM
mdCache map[string]*metricpb.MetricDescriptor

// A channel that receives timeserieses and exports them to GCM in batches
timeSeriesC chan *monitoringpb.TimeSeries
// stores the currently pending batch of timeserieses
pendingTimeSerieses []*monitoringpb.TimeSeries
batchTimeoutTimer *time.Timer
}

// metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has
Expand All @@ -91,11 +85,6 @@ const (
)

const (
// batchTimeout is how long to wait to build a full batch before sending
// off what we already have. We set it to 10 seconds because GCM
// throttles us to this anyway.
batchTimeout = 10 * time.Second

// The number of timeserieses to send to GCM in a single request. This
// is a hard limit in the GCM API, so we never want to exceed 200.
sendBatchSize = 200
Expand Down Expand Up @@ -158,23 +147,19 @@ func NewGoogleCloudMetricsExporter(
// to drop / conserve resources for sending timeseries.
metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdCache: make(map[string]*metricpb.MetricDescriptor),
timeSeriesC: make(chan *monitoringpb.TimeSeries),
shutdownC: shutdown,
}

// Fire up the metric descriptor exporter.
mExp.goroutines.Add(1)
go mExp.exportMetricDescriptorRunner()

// Fire up the time series exporter.
mExp.goroutines.Add(1)
go mExp.exportTimeSeriesRunner()

return mExp, nil
}

// PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) error {
pendingTimeSeries := []*monitoringpb.TimeSeries{}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -189,9 +174,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err
mes := sm.Metrics()
for k := 0; k < mes.Len(); k++ {
metric := mes.At(k)
for _, ts := range me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric) {
me.timeSeriesC <- ts
}
pendingTimeSeries = append(pendingTimeSeries, me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric)...)

// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
Expand All @@ -211,37 +194,35 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err
}
}
}
return nil
}

func (me *MetricsExporter) exportPendingTimeSerieses() {
ctx := context.Background()

var sendSize int
if len(me.pendingTimeSerieses) < sendBatchSize {
sendSize = len(me.pendingTimeSerieses)
} else {
sendSize = sendBatchSize
}
// Batch and export
for len(pendingTimeSeries) > 0 {
var sendSize int
if len(pendingTimeSeries) < sendBatchSize {
sendSize = len(pendingTimeSeries)
} else {
sendSize = sendBatchSize
}

var ts []*monitoringpb.TimeSeries
ts, me.pendingTimeSerieses = me.pendingTimeSerieses, me.pendingTimeSerieses[sendSize:]
var ts []*monitoringpb.TimeSeries
ts, pendingTimeSeries = pendingTimeSeries[:sendSize], pendingTimeSeries[sendSize:]

var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err = me.createServiceTimeSeries(ctx, ts)
} else {
err = me.createTimeSeries(ctx, ts)
}
var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err = me.createServiceTimeSeries(ctx, ts)
} else {
err = me.createTimeSeries(ctx, ts)
}

var st string
s, _ := status.FromError(err)
st = statusCodeToString(s)
var st string
s, _ := status.FromError(err)
st = statusCodeToString(s)

recordPointCountDataPoint(ctx, len(ts), st)
if err != nil {
me.obs.log.Error("could not export time series to GCM", zap.Error(err))
recordPointCountDataPoint(ctx, len(ts), st)
if err != nil {
return fmt.Errorf("failed to export time series to GCM: %v", err)
}
}
return nil
}

// Reads metric descriptors from the md channel, and reports them (once) to GCM.
Expand Down Expand Up @@ -1085,46 +1066,5 @@ func mapMetricPointKind(m pdata.Metric) (metricpb.MetricDescriptor_MetricKind, m
return kind, typ
}

func (me *MetricsExporter) exportTimeSeriesRunner() {
defer me.goroutines.Done()
me.batchTimeoutTimer = time.NewTimer(batchTimeout)
for {
select {
case <-me.shutdownC:
for {
// We are shutting down. Publish all the pending
// items on the channel before we stop.
select {
case ts := <-me.timeSeriesC:
me.processItem(ts)
default:
goto DONE
}
}
DONE:
for len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses()
}
// Return and continue graceful shutdown.
return
case ts := <-me.timeSeriesC:
me.processItem(ts)
case <-me.batchTimeoutTimer.C:
me.batchTimeoutTimer.Reset(batchTimeout)
for len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses()
}
}
}
}

func (me *MetricsExporter) processItem(ts *monitoringpb.TimeSeries) {
me.pendingTimeSerieses = append(me.pendingTimeSerieses, ts)
if len(me.pendingTimeSerieses) >= sendBatchSize {
if !me.batchTimeoutTimer.Stop() {
<-me.batchTimeoutTimer.C
}
me.batchTimeoutTimer.Reset(batchTimeout)
me.exportPendingTimeSerieses()
}
}

0 comments on commit e3aea42

Please sign in to comment.