-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
ocastore.go
141 lines (123 loc) · 4.22 KB
/
ocastore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)
const (
runningStateInit = iota
runningStateReady
runningStateStop
)
var idSeq int64
var noop = &noopAppender{}
// OcaStore translates Prometheus scraping diffs into OpenCensus format.
type OcaStore struct {
ctx context.Context
running int32 // access atomically
sink consumer.Metrics
mc *metadataService
jobsMap *JobsMapPdata
useStartTimeMetric bool
startTimeMetricRegex string
receiverID config.ComponentID
externalLabels labels.Labels
settings component.ReceiverCreateSettings
}
// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
set component.ReceiverCreateSettings,
gcInterval time.Duration,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
externalLabels labels.Labels) *OcaStore {
var jobsMap *JobsMapPdata
if !useStartTimeMetric {
jobsMap = NewJobsMapPdata(gcInterval)
}
return &OcaStore{
running: runningStateInit,
ctx: ctx,
sink: sink,
settings: set,
jobsMap: jobsMap,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
}
}
// SetScrapeManager is used to config the underlying scrape.Manager as it's needed for OcaStore, otherwise OcaStore
// cannot accept any Appender() request
func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
if scrapeManager != nil && atomic.CompareAndSwapInt32(&o.running, runningStateInit, runningStateReady) {
o.mc = &metadataService{sm: scrapeManager}
}
}
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
return newTransactionPdata(
o.ctx,
&txConfig{
jobsMap: o.jobsMap,
useStartTimeMetric: o.useStartTimeMetric,
startTimeMetricRegex: o.startTimeMetricRegex,
receiverID: o.receiverID,
ms: o.mc,
sink: o.sink,
externalLabels: o.externalLabels,
settings: o.settings,
},
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
}
// instead of returning an error, return a dummy appender instead, otherwise it can trigger panic
return noop
}
// Close OcaStore as well as the internal metadataService.
func (o *OcaStore) Close() {
if atomic.CompareAndSwapInt32(&o.running, runningStateReady, runningStateStop) {
o.mc.Close()
}
}
// noopAppender, always return error on any operations
type noopAppender struct{}
var errAlreadyStopped = errors.New("already stopped")
func (*noopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, errAlreadyStopped
}
func (*noopAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, errAlreadyStopped
}
func (*noopAppender) Commit() error {
return errAlreadyStopped
}
func (*noopAppender) Rollback() error {
return nil
}