Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow: move metrics components under prometheus component namespace #2080

Merged
merged 5 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/agent/example-config.river
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ local.file "this_file" {
detector = "fsnotify"
}

metrics.scrape "default" {
prometheus.scrape "default" {
targets = [{
"__address__" = "demo.robustperception.io:9090",
"dynamic_label" = local.file.this_file.content,
}]
forward_to = [metrics.remote_write.default.receiver]
forward_to = [prometheus.remote_write.default.receiver]

scrape_config {
job_name = "default"
}
}

metrics.remote_write "default" {
prometheus.remote_write "default" {
remote_write {
url = "http://localhost:9009/api/prom/push"
}
Expand Down
14 changes: 7 additions & 7 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
package all

import (
_ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.k8s
_ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/metrics/mutate" // Import metrics.mutate
_ "github.com/grafana/agent/component/metrics/remotewrite" // Import metrics.remotewrite
_ "github.com/grafana/agent/component/metrics/scrape" // Import metrics.scrape
_ "github.com/grafana/agent/component/remote/s3" // Import s3.file
_ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.k8s
_ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/prometheus/relabel" // Import prometheus.relabel
_ "github.com/grafana/agent/component/prometheus/remotewrite" // Import prometheus.remote_write
_ "github.com/grafana/agent/component/prometheus/scrape" // Import prometheus.scrape
_ "github.com/grafana/agent/component/remote/s3" // Import s3.file
)
30 changes: 15 additions & 15 deletions component/common/appendable/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"sync"

"github.com/grafana/agent/component/metrics"
"github.com/grafana/agent/component/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
Expand All @@ -20,19 +20,19 @@ type FlowMetric struct {
// FlowAppendable is a flow-specific implementation of an Appender.
type FlowAppendable struct {
mut sync.RWMutex
receivers []*metrics.Receiver
receivers []*prometheus.Receiver
}

// NewFlowAppendable initializes the appendable.
func NewFlowAppendable(receivers ...*metrics.Receiver) *FlowAppendable {
func NewFlowAppendable(receivers ...*prometheus.Receiver) *FlowAppendable {
return &FlowAppendable{
receivers: receivers,
}
}

type flowAppender struct {
buffer map[int64][]*metrics.FlowMetric // Though mostly a map of 1 item, this allows it to work if more than one TS gets added
receivers []*metrics.Receiver
buffer map[int64][]*prometheus.FlowMetric // Though mostly a map of 1 item, this allows it to work if more than one TS gets added
receivers []*prometheus.Receiver
}

// Appender implements the Prometheus Appendable interface.
Expand All @@ -41,20 +41,20 @@ func (app *FlowAppendable) Appender(_ context.Context) storage.Appender {
defer app.mut.RUnlock()

return &flowAppender{
buffer: make(map[int64][]*metrics.FlowMetric),
buffer: make(map[int64][]*prometheus.FlowMetric),
receivers: app.receivers,
}
}

// SetReceivers defines the list of receivers for this appendable.
func (app *FlowAppendable) SetReceivers(receivers []*metrics.Receiver) {
func (app *FlowAppendable) SetReceivers(receivers []*prometheus.Receiver) {
app.mut.Lock()
app.receivers = receivers
app.mut.Unlock()
}

// ListReceivers is a test method for exposing the Appender's receivers.
func (app *FlowAppendable) ListReceivers() []*metrics.Receiver {
func (app *FlowAppendable) ListReceivers() []*prometheus.Receiver {
app.mut.RLock()
defer app.mut.RUnlock()
return app.receivers
Expand All @@ -66,20 +66,20 @@ func (app *flowAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64,
}
_, found := app.buffer[t]
if !found {
set := make([]*metrics.FlowMetric, 0)
set := make([]*prometheus.FlowMetric, 0)
app.buffer[t] = set
}
// If ref is 0 then lets grab a global id
if ref == 0 {
ref = storage.SeriesRef(metrics.GlobalRefMapping.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(l))
}
// If it is stale then we can remove it
if value.IsStaleNaN(v) {
metrics.GlobalRefMapping.AddStaleMarker(uint64(ref), l)
prometheus.GlobalRefMapping.AddStaleMarker(uint64(ref), l)
} else {
metrics.GlobalRefMapping.RemoveStaleMarker(uint64(ref))
prometheus.GlobalRefMapping.RemoveStaleMarker(uint64(ref))
}
app.buffer[t] = append(app.buffer[t], metrics.NewFlowMetric(uint64(ref), l, v))
app.buffer[t] = append(app.buffer[t], prometheus.NewFlowMetric(uint64(ref), l, v))
return ref, nil
}

Expand All @@ -96,11 +96,11 @@ func (app *flowAppender) Commit() error {
r.Receive(ts, metrics)
}
}
app.buffer = make(map[int64][]*metrics.FlowMetric)
app.buffer = make(map[int64][]*prometheus.FlowMetric)
return nil
}

func (app *flowAppender) Rollback() error {
app.buffer = make(map[int64][]*metrics.FlowMetric)
app.buffer = make(map[int64][]*prometheus.FlowMetric)
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package prometheus

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package prometheus

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package prometheus

// remoteWriteMapping maps a remote_write to a set of global ids
type remoteWriteMapping struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package metrics
package prometheus

import (
"github.com/prometheus/prometheus/model/labels"
promrelabel "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/relabel"
)

// Receiver is used to pass an array of metrics to another receiver
Expand Down Expand Up @@ -52,8 +52,8 @@ func (fw *FlowMetric) RawLabels() labels.Labels {
}

// Relabel applies normal prometheus relabel rules and returns a flow metric. NOTE this may return itself.
func (fw *FlowMetric) Relabel(cfgs ...*promrelabel.Config) *FlowMetric {
retLbls := promrelabel.Process(fw.labels, cfgs...)
func (fw *FlowMetric) Relabel(cfgs ...*relabel.Config) *FlowMetric {
retLbls := relabel.Process(fw.labels, cfgs...)
if retLbls == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package metrics
package prometheus

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
promrelabel "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
)

func TestRelabel(t *testing.T) {
fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0)
require.True(t, fm.globalRefID != 0)
rg, _ := promrelabel.NewRegexp("(.*)")
newfm := fm.Relabel(&promrelabel.Config{
rg, _ := relabel.NewRegexp("(.*)")
newfm := fm.Relabel(&relabel.Config{
Replacement: "${1}_new",
Action: "replace",
TargetLabel: "new",
Expand All @@ -28,8 +28,8 @@ func TestRelabel(t *testing.T) {
func TestRelabelTheSame(t *testing.T) {
fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0)
require.True(t, fm.globalRefID != 0)
rg, _ := promrelabel.NewRegexp("bad")
newfm := fm.Relabel(&promrelabel.Config{
rg, _ := relabel.NewRegexp("bad")
newfm := fm.Relabel(&relabel.Config{
Replacement: "${1}_new",
Action: "replace",
TargetLabel: "new",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package mutate
package relabel

import (
"context"
"sync"

"github.com/grafana/agent/component"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/agent/component/prometheus"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"
)

func init() {
component.Register(component.Registration{
Name: "metrics.mutate",
Name: "prometheus.relabel",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand All @@ -22,41 +22,41 @@ func init() {
})
}

// Arguments holds values which are used to configure the metrics.mutate
// Arguments holds values which are used to configure the prometheus.relabel
// component.
type Arguments struct {
// Where the relabelled metrics should be forwarded to.
ForwardTo []*metrics.Receiver `river:"forward_to,attr"`
ForwardTo []*prometheus.Receiver `river:"forward_to,attr"`

// The relabelling steps to apply to each metric before it's forwarded.
MetricRelabelConfigs []*flow_relabel.Config `river:"metric_relabel_config,block,optional"`
}

// Exports holds values which are exported by the metrics.mutate component.
// Exports holds values which are exported by the prometheus.relabel component.
type Exports struct {
Receiver *metrics.Receiver `river:"receiver,attr"`
Receiver *prometheus.Receiver `river:"receiver,attr"`
}

// Component implements the metrics.mutate component.
// Component implements the prometheus.relabel component.
type Component struct {
mut sync.RWMutex
opts component.Options
mrc []*relabel.Config
forwardto []*metrics.Receiver
receiver *metrics.Receiver
metricsProcessed prometheus.Counter
forwardto []*prometheus.Receiver
receiver *prometheus.Receiver
metricsProcessed prometheus_client.Counter
}

var (
_ component.Component = (*Component)(nil)
)

// New creates a new metrics.mutate component.
// New creates a new prometheus.relabel component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{opts: o}
c.receiver = &metrics.Receiver{Receive: c.Receive}
c.metricsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "agent_metrics_mutate_metrics_processed",
c.receiver = &prometheus.Receiver{Receive: c.Receive}
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "agent_prometheus_relabel_metrics_processed",
Help: "Total number of metrics processed",
})

Expand Down Expand Up @@ -98,13 +98,13 @@ func (c *Component) Update(args component.Arguments) error {
// TODO (@tpaschalis) The relabelling process will run _every_ time, for all
// metrics, resulting in some serious CPU overhead. We should be caching the
// relabeling results per refID and clearing entries for dropped or stale
// series. This is a blocker for releasing a production-grade of the metrics.mutate
// component.
func (c *Component) Receive(ts int64, metricArr []*metrics.FlowMetric) {
// series. This is a blocker for releasing a production-grade version of the
// prometheus.relabel component.
func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) {
c.mut.RLock()
defer c.mut.RUnlock()

relabelledMetrics := make([]*metrics.FlowMetric, 0)
relabelledMetrics := make([]*prometheus.FlowMetric, 0)
for _, m := range metricArr {
// Relabel may return the original flowmetric if no changes applied, nil if everything was removed or an entirely new flowmetric.
relabelledFm := m.Relabel(c.mrc...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"sync"
"time"

"github.com/grafana/agent/component/metrics"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/prometheus"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/metrics/wal"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -31,7 +30,7 @@ func init() {
remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)

component.Register(component.Registration{
Name: "metrics.remote_write",
Name: "prometheus.remote_write",
Args: RemoteConfig{},
Exports: Export{},
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
Expand All @@ -40,7 +39,7 @@ func init() {
})
}

// Component is the metrics_forwarder component.
// Component is the prometheus.remote_write component.
type Component struct {
log log.Logger
opts component.Options
Expand All @@ -52,10 +51,10 @@ type Component struct {
mut sync.RWMutex
cfg RemoteConfig

receiver *metrics.Receiver
receiver *prometheus.Receiver
}

// NewComponent creates a new metrics_forwarder component.
// NewComponent creates a new prometheus.remote_write component.
func NewComponent(o component.Options, c RemoteConfig) (*Component, error) {
walLogger := log.With(o.Logger, "subcomponent", "wal")
dataPath := filepath.Join(o.DataPath, "wal", o.ID)
Expand All @@ -74,7 +73,7 @@ func NewComponent(o component.Options, c RemoteConfig) (*Component, error) {
remoteStore: remoteStore,
storage: storage.NewFanout(o.Logger, walStorage, remoteStore),
}
res.receiver = &metrics.Receiver{Receive: res.Receive}
res.receiver = &prometheus.Receiver{Receive: res.Receive}
if err := res.Update(c); err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,16 +179,16 @@ func (c *Component) Update(newConfig component.Arguments) error {
}

// Receive implements the receiver.receive func that allows an array of metrics to be passed
func (c *Component) Receive(ts int64, metricArr []*metrics.FlowMetric) {
func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) {
app := c.storage.Appender(context.Background())
for _, m := range metricArr {
localID := metrics.GlobalRefMapping.GetLocalRefID(c.opts.ID, m.GlobalRefID())
localID := prometheus.GlobalRefMapping.GetLocalRefID(c.opts.ID, m.GlobalRefID())
// Currently it doesn't look like the storage interfaces mutate the labels, but thats not a strong
// promise. So this should be treated with care.
newLocal, err := app.Append(storage.SeriesRef(localID), m.RawLabels(), ts, m.Value())
// Add link if there wasn't one before, and we received a valid local id
if localID == 0 && newLocal != 0 {
metrics.GlobalRefMapping.GetOrAddLink(c.opts.ID, uint64(newLocal), m)
prometheus.GlobalRefMapping.GetOrAddLink(c.opts.ID, uint64(newLocal), m)
}
if err != nil {
_ = app.Rollback()
Expand Down
Loading