Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with loki.process metrics stage during config reload #1292

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ Main (unreleased)
- Fixed a clustering mode issue where a fatal startup failure of the clustering service
would exit the service silently, without also exiting the Alloy process. (@thampiotr)

- Fix a bug which prevented config reloads to work if a Loki `metrics` stage is in the pipeline.
Previously, the reload would fail for `loki.process` without an error in the logs and the metrics
from the `metrics` stage would get stuck at the same values. (@ptodev)

v1.2.1
-----------------

Expand Down
4 changes: 4 additions & 0 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ The following blocks are supported inside the definition of `stage.metrics`:
| metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no |
| metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no |

{{< admonition type="note" >}}
The metrics will be reset if you reload the {{< param "PRODUCT_NAME" >}} configuration file.
{{< /admonition >}}

[metric.counter]: #metriccounter-block
[metric.gauge]: #metricgauge-block
[metric.histogram]: #metrichistogram-block
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
c.stages = newArgs.Stages
}
Expand Down
193 changes: 191 additions & 2 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand All @@ -27,6 +29,8 @@ import (
"github.com/grafana/alloy/syntax"
)

const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`

func TestJSONLabelsStage(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

Expand Down Expand Up @@ -95,7 +99,6 @@ func TestJSONLabelsStage(t *testing.T) {

// Send a log entry to the component's receiver.
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -461,7 +464,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
go func() {
for {
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -502,3 +504,190 @@ func getServiceData(name string) (interface{}, error) {
return nil, fmt.Errorf("service not found %s", name)
}
}

func TestMetricsStageRefresh(t *testing.T) {
tester := newTester(t)
defer tester.stop()

forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 3

cfgWithMetric := `
stage.metrics {
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

cfgWithMetric_Metrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

// The component will be reconfigured so that it has a metric.
t.Run("config with a metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
"",
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend))
})

// The component will be "updated" with the same config.
// We expect the metric to stay the same before logs are sent - the component should be smart enough to
// know that the new config is the same as the old one and it should just keep running as it is.
// If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader"
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
t.Run("config with the same metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend),
fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend))
})

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
cfgWithNoStages := forwardArgs
t.Run("config with no metrics stage", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "")
})

// Use a config which has a metric with a different name,
// as well as a metric with the same name as the one in the previous config.
// We try having a metric with the same name as before so that we can see if there
// is some sort of double registration error for that metric.
cfgWithTwoMetrics := `
stage.metrics {
metric.counter {
name = "paulin_test_3"
action = "inc"
match_all = true
}
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

t.Run("config with a new and old metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics,
"",
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend))
})
}

type tester struct {
t *testing.T
component *Component
registry *prometheus.Registry
cancelFunc context.CancelFunc
logReceiver loki.LogsReceiver
logTimestamp time.Time
logEntry loki.Entry
wantLabelSet model.LabelSet
}

// Create the component, so that it can process and forward logs.
func newTester(t *testing.T) *tester {
reg := prometheus.NewRegistry()

opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
GetServiceData: getServiceData,
}

initialCfg := `forward_to = []`
var args Arguments
err := syntax.Unmarshal([]byte(initialCfg), &args)
require.NoError(t, err)

logReceiver := loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{logReceiver}

c, err := New(opts, args)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
go c.Run(ctx)

logTimestamp := time.Now()

return &tester{
t: t,
component: c,
registry: reg,
cancelFunc: cancel,
logReceiver: logReceiver,
logTimestamp: logTimestamp,
logEntry: loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: logTimestamp,
Line: logline,
},
},
wantLabelSet: model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
},
}
}

func (t *tester) stop() {
t.cancelFunc()
}

func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) {
var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.NoError(t.t, err)

args.ForwardTo = []loki.LogsReceiver{t.logReceiver}

t.component.Update(args)

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil {
require.NoError(t.t, err)
}

// Send logs.
for i := 0; i < numLogsToSend; i++ {
t.component.receiver.Chan() <- t.logEntry
}

// Receive logs.
for i := 0; i < numLogsToSend; i++ {
select {
case logEntry := <-t.logReceiver.Chan():
require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp))
require.Equal(t.t, logline, logEntry.Line)
require.Equal(t.t, t.wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t.t, "failed waiting for log line")
}
}

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil {
require.NoError(t.t, err)
}
}
41 changes: 41 additions & 0 deletions internal/util/unregisterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,38 @@ type unregisterer struct {
cs map[prometheus.Collector]struct{}
}

// An "unchecked collector" is a collector which returns an empty description.
// It is described in the Prometheus documentation, here:
// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics
//
// > Alternatively, you could return no Desc at all, which will mark the Collector “unchecked”.
// > No checks are performed at registration time, but metric consistency will still be ensured at scrape time,
// > i.e. any inconsistencies will lead to scrape errors. Thus, with unchecked Collectors,
// > the responsibility to not collect metrics that lead to inconsistencies in the total scrape result
// > lies with the implementer of the Collector. While this is not a desirable state, it is sometimes necessary.
// > The typical use case is a situation where the exact metrics to be returned by a Collector cannot be predicted
// > at registration time, but the implementer has sufficient knowledge of the whole system to guarantee metric consistency.
//
// Unchecked collectors are used in the Loki "metrics" stage of the Loki "process" component.
//
// The isUncheckedCollector function is similar to how Prometheus' Go client extracts the metric description:
// https://github.com/prometheus/client_golang/blob/45f1e72421d9d11af6be784ad60b7389f7543e70/prometheus/registry.go#L372-L381
func isUncheckedCollector(c prometheus.Collector) bool {
descChan := make(chan *prometheus.Desc, 10)

go func() {
c.Describe(descChan)
close(descChan)
}()

i := 0
for range descChan {
i += 1
}

return i == 0
}

// Register implements prometheus.Registerer.
func (u *unregisterer) Register(c prometheus.Collector) error {
if u.wrap == nil {
Expand All @@ -33,6 +65,11 @@ func (u *unregisterer) Register(c prometheus.Collector) error {
if err != nil {
return err
}

if isUncheckedCollector(c) {
return nil
}

u.cs[c] = struct{}{}
return nil
}
Expand All @@ -48,6 +85,10 @@ func (u *unregisterer) MustRegister(cs ...prometheus.Collector) {

// Unregister implements prometheus.Registerer.
func (u *unregisterer) Unregister(c prometheus.Collector) bool {
if isUncheckedCollector(c) {
return true
}

if u.wrap != nil && u.wrap.Unregister(c) {
delete(u.cs, c)
return true
Expand Down
35 changes: 35 additions & 0 deletions internal/util/unregisterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func Test_UnregisterTwice_NormalCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "Test metric.",
})
u.Register(c)
require.True(t, u.Unregister(c))
require.False(t, u.Unregister(c))
}

type uncheckedCollector struct{}

func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {}

func (uncheckedCollector) Collect(chan<- prometheus.Metric) {}

var _ prometheus.Collector = uncheckedCollector{}

func Test_UnregisterTwice_UncheckedCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := uncheckedCollector{}
u.Register(c)
require.True(t, u.Unregister(c))
require.True(t, u.Unregister(c))
}
Loading