Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send timeseries synchronously #358

Merged
merged 2 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ These instructions are to get you up and running quickly with the GCP exporter i
exporters: [googlecloud, logging]
metrics:
receivers: [otlp]
processors: [memory_limiter]
processors: [memory_limiter, batch]
tbarker25 marked this conversation as resolved.
Show resolved Hide resolved
exporters: [googlecloud, logging]
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2620,7 +2620,7 @@
}
},
{
"name":"telemetry_scrape_duration_seconds",
"name":"other_telemetry_scrape_duration_seconds",
"description":"Scrape duration",
"unit":"s",
"summary":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@
},
{
"metric": {
"type": "workload.googleapis.com/telemetry_scrape_duration_seconds_sum",
"type": "workload.googleapis.com/other_telemetry_scrape_duration_seconds_sum",
"labels": {
"content_type": "text/plain; version=0.0.4",
"job": "default/rabbitmq/0",
Expand Down Expand Up @@ -3587,7 +3587,7 @@
},
{
"metric": {
"type": "workload.googleapis.com/telemetry_scrape_duration_seconds_count",
"type": "workload.googleapis.com/other_telemetry_scrape_duration_seconds_count",
"labels": {
"content_type": "text/plain; version=0.0.4",
"job": "default/rabbitmq/0",
Expand Down
112 changes: 26 additions & 86 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ type MetricsExporter struct {
metricDescriptorC chan *metricpb.MetricDescriptor
// Tracks the metric descriptors that have already been sent to GCM
mdCache map[string]*metricpb.MetricDescriptor

// A channel that receives timeserieses and exports them to GCM in batches
timeSeriesC chan *monitoringpb.TimeSeries
// stores the currently pending batch of timeserieses
pendingTimeSerieses []*monitoringpb.TimeSeries
batchTimeoutTimer *time.Timer
}

// metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has
Expand All @@ -91,11 +85,6 @@ const (
)

const (
// batchTimeout is how long to wait to build a full batch before sending
// off what we already have. We set it to 10 seconds because GCM
// throttles us to this anyway.
batchTimeout = 10 * time.Second

// The number of timeserieses to send to GCM in a single request. This
// is a hard limit in the GCM API, so we never want to exceed 200.
sendBatchSize = 200
Expand Down Expand Up @@ -158,23 +147,19 @@ func NewGoogleCloudMetricsExporter(
// to drop / conserve resources for sending timeseries.
metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdCache: make(map[string]*metricpb.MetricDescriptor),
timeSeriesC: make(chan *monitoringpb.TimeSeries),
shutdownC: shutdown,
}

// Fire up the metric descriptor exporter.
mExp.goroutines.Add(1)
go mExp.exportMetricDescriptorRunner()

// Fire up the time series exporter.
mExp.goroutines.Add(1)
go mExp.exportTimeSeriesRunner()

return mExp, nil
}

// PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) error {
pendingTimeSeries := []*monitoringpb.TimeSeries{}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -189,9 +174,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err
mes := sm.Metrics()
for k := 0; k < mes.Len(); k++ {
metric := mes.At(k)
for _, ts := range me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric) {
me.timeSeriesC <- ts
}
pendingTimeSeries = append(pendingTimeSeries, me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric)...)

// We only send metric descriptors if we're configured *and* we're not sending service timeseries.
if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries {
Expand All @@ -211,37 +194,35 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err
}
}
}
return nil
}

func (me *MetricsExporter) exportPendingTimeSerieses() {
ctx := context.Background()

var sendSize int
if len(me.pendingTimeSerieses) < sendBatchSize {
sendSize = len(me.pendingTimeSerieses)
} else {
sendSize = sendBatchSize
}
// Batch and export
for len(pendingTimeSeries) > 0 {
var sendSize int
if len(pendingTimeSeries) < sendBatchSize {
sendSize = len(pendingTimeSeries)
} else {
sendSize = sendBatchSize
}

var ts []*monitoringpb.TimeSeries
ts, me.pendingTimeSerieses = me.pendingTimeSerieses, me.pendingTimeSerieses[sendSize:]
var ts []*monitoringpb.TimeSeries
ts, pendingTimeSeries = pendingTimeSeries[:sendSize], pendingTimeSeries[sendSize:]

var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err = me.createServiceTimeSeries(ctx, ts)
} else {
err = me.createTimeSeries(ctx, ts)
}
var err error
if me.cfg.MetricConfig.CreateServiceTimeSeries {
err = me.createServiceTimeSeries(ctx, ts)
} else {
err = me.createTimeSeries(ctx, ts)
}

var st string
s, _ := status.FromError(err)
st = statusCodeToString(s)
var st string
s, _ := status.FromError(err)
st = statusCodeToString(s)

recordPointCountDataPoint(ctx, len(ts), st)
if err != nil {
me.obs.log.Error("could not export time series to GCM", zap.Error(err))
recordPointCountDataPoint(ctx, len(ts), st)
if err != nil {
return fmt.Errorf("failed to export time series to GCM: %v", err)
}
}
return nil
}

// Reads metric descriptors from the md channel, and reports them (once) to GCM.
Expand Down Expand Up @@ -1085,46 +1066,5 @@ func mapMetricPointKind(m pdata.Metric) (metricpb.MetricDescriptor_MetricKind, m
return kind, typ
}

func (me *MetricsExporter) exportTimeSeriesRunner() {
defer me.goroutines.Done()
me.batchTimeoutTimer = time.NewTimer(batchTimeout)
for {
select {
case <-me.shutdownC:
for {
// We are shutting down. Publish all the pending
// items on the channel before we stop.
select {
case ts := <-me.timeSeriesC:
me.processItem(ts)
default:
goto DONE
}
}
DONE:
for len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses()
}
// Return and continue graceful shutdown.
return
case ts := <-me.timeSeriesC:
me.processItem(ts)
case <-me.batchTimeoutTimer.C:
me.batchTimeoutTimer.Reset(batchTimeout)
for len(me.pendingTimeSerieses) > 0 {
me.exportPendingTimeSerieses()
}
}
}
}

func (me *MetricsExporter) processItem(ts *monitoringpb.TimeSeries) {
me.pendingTimeSerieses = append(me.pendingTimeSerieses, ts)
if len(me.pendingTimeSerieses) >= sendBatchSize {
if !me.batchTimeoutTimer.Stop() {
<-me.batchTimeoutTimer.C
}
me.batchTimeoutTimer.Reset(batchTimeout)
me.exportPendingTimeSerieses()
}
}