Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Transactional Gatherer allowed cached solutions #989

Merged
merged 13 commits into from
Feb 23, 2022
4 changes: 3 additions & 1 deletion prometheus/desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strings"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/client_golang/prometheus/internal"

//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
"github.com/golang/protobuf/proto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
Value: proto.String(v),
})
}
sort.Sort(labelPairSorter(d.constLabelPairs))
sort.Sort(internal.LabelPairSorter(d.constLabelPairs))
return d
}

Expand Down
28 changes: 22 additions & 6 deletions prometheus/internal/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,34 @@ import (
dto "github.com/prometheus/client_model/go"
)

// metricSorter is a sortable slice of *dto.Metric.
type metricSorter []*dto.Metric
// LabelPairSorter implements sort.Interface. It is used to sort a slice of
// dto.LabelPair pointers.
type LabelPairSorter []*dto.LabelPair

func (s metricSorter) Len() int {
func (s LabelPairSorter) Len() int {
return len(s)
}

func (s metricSorter) Swap(i, j int) {
func (s LabelPairSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s metricSorter) Less(i, j int) bool {
func (s LabelPairSorter) Less(i, j int) bool {
return s[i].GetName() < s[j].GetName()
}

// MetricSorter is a sortable slice of *dto.Metric.
type MetricSorter []*dto.Metric

func (s MetricSorter) Len() int {
return len(s)
}

func (s MetricSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s MetricSorter) Less(i, j int) bool {
if len(s[i].Label) != len(s[j].Label) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
Expand Down Expand Up @@ -68,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool {
// the slice, with the contained Metrics sorted within each MetricFamily.
func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
sort.Sort(MetricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
Expand Down
16 changes: 0 additions & 16 deletions prometheus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string {
return name
}

// labelPairSorter implements sort.Interface. It is used to sort a slice of
// dto.LabelPair pointers.
type labelPairSorter []*dto.LabelPair

func (s labelPairSorter) Len() int {
return len(s)
}

func (s labelPairSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s labelPairSorter) Less(i, j int) bool {
return s[i].GetName() < s[j].GetName()
}

type invalidMetric struct {
desc *Desc
err error
Expand Down
10 changes: 9 additions & 1 deletion prometheus/promhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func Handler() http.Handler {
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts)
}

// HandlerForTransactional is like HandlerFor, but it uses transactional gather, which
// can safely change in-place returned *dto.MetricFamily before call to `Gather` and after
// call to `done` of that `Gather`.
func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
Expand Down Expand Up @@ -123,7 +130,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return
}
}
mfs, err := reg.Gather()
mfs, done, err := reg.Gather()
defer done()
if err != nil {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
Expand Down
107 changes: 83 additions & 24 deletions prometheus/promhttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package promhttp
import (
"bytes"
"errors"
"fmt"
"log"
"net/http"
"net/http/httptest"
Expand All @@ -24,6 +25,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

type errorCollector struct{}
Expand Down Expand Up @@ -56,8 +58,19 @@ func (b blockingCollector) Collect(ch chan<- prometheus.Metric) {
<-b.Block
}

func TestHandlerErrorHandling(t *testing.T) {
type mockTransactionGatherer struct {
g prometheus.Gatherer
gatherInvoked int
doneInvoked int
}

func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
g.gatherInvoked++
mfs, err := g.g.Gather()
return mfs, func() { g.doneInvoked++ }, err
}

func TestHandlerErrorHandling(t *testing.T) {
// Create a registry that collects a MetricFamily with two elements,
// another with one, and reports an error. Further down, we'll use the
// same registry in the HandlerOpts.
Expand Down Expand Up @@ -90,21 +103,30 @@ func TestHandlerErrorHandling(t *testing.T) {
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")

errorHandler := HandlerFor(reg, HandlerOpts{
mReg := &mockTransactionGatherer{g: reg}
errorHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: HTTPErrorOnError,
Registry: reg,
})
continueHandler := HandlerFor(reg, HandlerOpts{
continueHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: ContinueOnError,
Registry: reg,
})
panicHandler := HandlerFor(reg, HandlerOpts{
panicHandler := HandlerForTransactional(mReg, HandlerOpts{
ErrorLog: logger,
ErrorHandling: PanicOnError,
Registry: reg,
})
// Expect gatherer not touched.
if got := mReg.gatherInvoked; got != 0 {
t.Fatalf("unexpected number of gather invokes, want 0, got %d", got)
}
if got := mReg.doneInvoked; got != 0 {
t.Fatalf("unexpected number of done invokes, want 0, got %d", got)
}

wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
`
wantErrorBody := `An error has occurred while serving metrics:
Expand Down Expand Up @@ -140,25 +162,39 @@ the_count 0
`

errorHandler.ServeHTTP(writer, request)
if got := mReg.gatherInvoked; got != 1 {
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
}
if got := mReg.doneInvoked; got != 1 {
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
}
if got, want := writer.Code, http.StatusInternalServerError; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got := logBuf.String(); got != wantMsg {
t.Errorf("got log message:\n%s\nwant log message:\n%s\n", got, wantMsg)
if got, want := logBuf.String(), wantMsg; got != want {
t.Errorf("got log buf %q, want %q", got, want)
}
if got := writer.Body.String(); got != wantErrorBody {
t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody)
if got, want := writer.Body.String(), wantErrorBody; got != want {
t.Errorf("got body %q, want %q", got, want)
}

logBuf.Reset()
writer.Body.Reset()
writer.Code = http.StatusOK

continueHandler.ServeHTTP(writer, request)

if got := mReg.gatherInvoked; got != 2 {
t.Fatalf("unexpected number of gather invokes, want 2, got %d", got)
}
if got := mReg.doneInvoked; got != 2 {
t.Fatalf("unexpected number of done invokes, want 2, got %d", got)
}
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got := logBuf.String(); got != wantMsg {
t.Errorf("got log message %q, want %q", got, wantMsg)
if got, want := logBuf.String(), wantMsg; got != want {
t.Errorf("got log buf %q, want %q", got, want)
}
if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 {
t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2)
Expand All @@ -168,20 +204,34 @@ the_count 0
if err := recover(); err == nil {
t.Error("expected panic from panicHandler")
}
if got := mReg.gatherInvoked; got != 3 {
t.Fatalf("unexpected number of gather invokes, want 3, got %d", got)
}
if got := mReg.doneInvoked; got != 3 {
t.Fatalf("unexpected number of done invokes, want 3, got %d", got)
}
}()
panicHandler.ServeHTTP(writer, request)
}

func TestInstrumentMetricHandler(t *testing.T) {
reg := prometheus.NewRegistry()
handler := InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
mReg := &mockTransactionGatherer{g: reg}
handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
// Do it again to test idempotency.
InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")

handler.ServeHTTP(writer, request)
if got := mReg.gatherInvoked; got != 1 {
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
}
if got := mReg.doneInvoked; got != 1 {
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
}

if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
Expand All @@ -195,19 +245,28 @@ func TestInstrumentMetricHandler(t *testing.T) {
t.Errorf("got body %q, does not contain %q", got, want)
}

writer.Body.Reset()
handler.ServeHTTP(writer, request)
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
for i := 0; i < 100; i++ {
writer.Body.Reset()
handler.ServeHTTP(writer, request)

want = "promhttp_metric_handler_requests_in_flight 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
want = "promhttp_metric_handler_requests_total{code=\"200\"} 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
if got, want := mReg.gatherInvoked, i+2; got != want {
t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got)
}
if got, want := mReg.doneInvoked, i+2; got != want {
t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got)
}
if got, want := writer.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}

want := "promhttp_metric_handler_requests_in_flight 1\n"
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1)
if got := writer.Body.String(); !strings.Contains(got, want) {
t.Errorf("got body %q, does not contain %q", got, want)
}
}
}

Expand Down
Loading