Skip to content

Commit

Permalink
Update Loki dependency (#6905)
Browse files Browse the repository at this point in the history
* Update Loki dependency.

* Clean up metrics generated from logs after a config reload.

Synced from:
grafana/loki#12938

* Update README
  • Loading branch information
ptodev authored May 15, 2024
1 parent 54b0959 commit f817a47
Show file tree
Hide file tree
Showing 20 changed files with 121 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Main (unreleased)
- Fix an issue on Windows where uninstalling Alloy did not remove it from the
Add/Remove programs list. (@rfratto)

- Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly.

### Other changes

- Clustering for Grafana Agent in Flow mode has graduated from beta to stable.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 // k190 branch
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b // k190 branch
github.com/grafana/pyroscope-go/godeltaprof v0.1.7
github.com/grafana/pyroscope/api v0.4.0
github.com/grafana/pyroscope/ebpf v0.4.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a h1:jqM4NNdx
github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a/go.mod h1:ZXGGyeTUMenf/H1CDBK9lv3azjswfa0nVzLoQAYmnDc=
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 h1:C191g5Ls8lIf9lkJEoScTQgoVDwUdK4HXKP5XtL+zAM=
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU=
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b h1:x5JsSnExxRl9kTMNqHebMCv0fn+V1+T16z7Tgz6xYf4=
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU=
github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 h1:6kPX7bngjBgUlHqADwZ6249UtzMaoQW5n0H8bOtnYeM=
github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0=
github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 h1:I9sRknI5ajd8whPOX0nBDXy5B6xUfhItClMy+6R4oqE=
Expand Down
6 changes: 6 additions & 0 deletions internal/component/loki/process/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,8 @@ func splitSource(s string) []string {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (c *cri) Name() string {
return StageTypeCRI
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions internal/component/loki/process/stages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus
return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg)
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: config,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -118,6 +118,19 @@ type metricStage struct {
metrics map[string]cfgCollector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, cc := range m.metrics {
Expand Down Expand Up @@ -162,6 +175,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, cfgCollector := range m.metrics {
switch vec := cfgCollector.collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
Expand Down
7 changes: 7 additions & 0 deletions internal/component/loki/process/stages/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string {
return StageTypePipeline
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
Expand Down Expand Up @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
}
return s, nil
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/structured_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down

0 comments on commit f817a47

Please sign in to comment.