Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record Metrics for Reminder #4831

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package metrics provides metrics for the reminder service
package metrics

import (
"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the delay histograms
var delayBuckets = []float64{
60, // 1 minute
300, // 5 minutes
600, // 10 minutes
1800, // 30 minutes
3600, // 1 hour
7200, // 2 hours
10800, // 3 hours
18000, // 5 hours
25200, // 7 hours
36000, // 10 hours
}

// Metrics contains all the metrics for the reminder service
type Metrics struct {
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Time between when a reminder became eligible and when it was sent for the first time
NewSendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Histogram
}

// NewMetrics creates a new metrics instance
func NewMetrics(meter metric.Meter) (*Metrics, error) {
sendDelay, err := meter.Float64Histogram(
"send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

newSendDelay, err := meter.Float64Histogram(
"new_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Histogram(
"batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
NewSendDelay: newSendDelay,
BatchSize: batchSize,
}, nil
}
87 changes: 87 additions & 0 deletions internal/reminder/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

func (r *reminder) startMetricServer(ctx context.Context, mpReady chan<- struct{}) error {
logger := zerolog.Ctx(ctx)

prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder"),
)
if err != nil {
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder"),
// TODO: Make this auto-generated
semconv.ServiceVersion("v0.1.0"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

otel.SetMeterProvider(mp)

// Indicates that a global MeterProvider is available
close(mpReady)

mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())
Comment on lines +53 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need an http.ServeMux here, rather than simply passing promhttp.Handler to the http.Server below?


server := &http.Server{
Addr: r.cfg.MetricServer.GetAddress(),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}

logger.Info().Msgf("starting metrics server on %s", server.Addr)

errCh := make(chan error)
go func() {
errCh <- server.ListenAndServe()
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
case <-r.stop:
}

// shutdown the metrics server when either the context is done or when reminder is stopped
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownRelease()

logger.Info().Msg("shutting down metrics server")

if err = server.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics server")
}

return mp.Shutdown(shutdownCtx)
}
80 changes: 62 additions & 18 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"

"github.com/mindersec/minder/internal/db"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
"github.com/mindersec/minder/internal/reminder/metrics"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
)
Expand All @@ -42,6 +44,8 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher

metrics *metrics.Metrics
}

// NewReminder creates a new reminder instance
Expand Down Expand Up @@ -74,21 +78,52 @@ func (r *reminder) Start(ctx context.Context) error {
return errors.New("reminder stopped, cannot start again")
default:
}
defer r.Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of odd, since Start doesn't return, and would normally be exited by calling Stop(). Add a comment here on why this is needed?

(I'm not saying it's wrong, I'm saying to drop a hint to prevent future authors from deleting the line.


interval := r.cfg.RecurrenceConfig.Interval
if interval <= 0 {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
}

metricsServerDone := make(chan struct{})

if r.cfg.MetricsConfig.Enabled {
metricsProviderReady := make(chan struct{})

go func() {
if err := r.startMetricServer(ctx, metricsProviderReady); err != nil {
logger.Err(err).Msg("failed to start metrics server")
}
close(metricsServerDone)
}()
Comment on lines +93 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to me to end up needing to call startMetricServer in another thread and then deal with that, given that startMetricServer also sets up a background thread (and then waits for it).

We end up with:

main
 \--> startMetricServer
          \--> http.ListenAndServe

When it feels like we should have:

main
\--> http.ListenAndServe

Possibly with a "closer" callback.

I only mention this because Join-ing goroutines is a bunch of code that distracts from the rest of what's going on, so I tend to prefer to minimize how often I need to block on a goroutine exiting.


select {
case <-metricsProviderReady:
var err error
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
if err != nil {
return err
}
case <-ctx.Done():
logger.Info().Msg("reminder stopped")
return nil
}
Comment on lines +100 to +110
Copy link
Member Author

@Vyom-Yadav Vyom-Yadav Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this isn't me complicating stuff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think you're being overly conscientious for shutdown from main, but I'll allow it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, kindly!

}

r.ticker = time.NewTicker(interval)
defer r.Stop()

for {
select {
case <-ctx.Done():
if r.cfg.MetricsConfig.Enabled {
<-metricsServerDone
}
Comment on lines +118 to +120
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this one (making sure I'm not complicating things)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you might more easily do the following:

if r.cfg.MetricsConfig.Enabled {
  ...
} else {
  close(metricsServerDone)
}

And then you won't need the conditional here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand that. The current shutdown sequence waits for the metrics server to be done. (waiting so that deferred call cannot be executed i.e. the Stop() call for reminder)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you close(metricsServerDone) in the else block on 111, 118-120 and 124-126 can simply do <-metricsServerDone whether or not r.cfg.MetricsConfig.Enabled is true or false, because either case will close the channel.

I'd also be okay with leaking the metrics HTTP server if it doesn't cause harm elsewhere.

logger.Info().Msg("reminder stopped")
return nil
case <-r.stop:
if r.cfg.MetricsConfig.Enabled {
<-metricsServerDone
}
logger.Info().Msg("reminder stopped")
return nil
case <-r.ticker.C:
Expand Down Expand Up @@ -126,7 +161,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

// Fetch a batch of repositories
repos, err := r.getRepositoryBatch(ctx)
repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx)
if err != nil {
return fmt.Errorf("error fetching repository batch: %w", err)
}
Expand All @@ -143,6 +178,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

if r.metrics != nil {
r.metrics.BatchSize.Record(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,13 +190,16 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
}
if r.metrics != nil {
sendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution
recorder := r.metrics.SendDelay
if !repo.ReminderLastSent.Valid {
recorder = r.metrics.NewSendDelay
}
recorder.Record(ctx, sendDelay.Seconds())
}
}
Comment on lines +193 to +202
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this all brings back the question that how useful is ReminderLastSent metric that we store in the main DB? Is there any other application for it? Right now it's just being used as a boolean for checking whether a repo has been reconciled or not (through reminder)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mean the column in the database? Given where we're at now, maybe we don't need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it for some time around, till we setup reminder with metrics to see if this can be potentially valuable. We can cleanup later if not. (Adding code requires more testing)


err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
Expand All @@ -167,7 +209,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return nil
}

func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) {
logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
Expand All @@ -176,21 +218,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
})
if err != nil {
return nil, err
return nil, nil, err
}

eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos)
if err != nil {
return nil, err
return nil, nil, err
}
logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))

r.updateRepositoryCursor(ctx, repos)

return eligibleRepos, nil
return eligibleRepos, eligibleReposLastUpdated, nil
}

func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) (
[]db.Repository, map[uuid.UUID]time.Time, error,
) {
eligibleRepos := make([]db.Repository, 0, len(repos))

// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
Expand All @@ -202,11 +246,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
if err != nil {
return nil, err
return nil, nil, err
}
idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
for _, times := range oldestRuleEvals {
idToLastUpdate[times.RepositoryID] = times.OldestLastUpdated
for _, ruleEval := range oldestRuleEvals {
idToLastUpdate[ruleEval.RepositoryID] = ruleEval.OldestLastUpdated
}

cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed)
Expand All @@ -216,7 +260,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
}

return eligibleRepos, nil
return eligibleRepos, idToLastUpdate, nil
}

func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) {

r := createTestReminder(t, store, cfg)

got, err := r.getRepositoryBatch(context.Background())
got, _, err := r.getRepositoryBatch(context.Background())
if test.err != "" {
require.ErrorContains(t, err, test.err)
return
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"`
MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"`
}

// Validate validates the configuration
Expand Down
Loading