Skip to content

Commit

Permalink
Periodic: Add basic metrics in the library
Browse files Browse the repository at this point in the history
Adds for every periodic task
```
bs_beacon_cleaner_periodic_period_duration_seconds 30
bs_beacon_cleaner_periodic_runtime_duration_seconds_total 0.028704568000000007
bs_beacon_cleaner_periodic_runtime_timestamp_seconds 1.570616739e+09
```

Other:
* modifies the periodic.StartTask to accept interval not ticker.
* periodic metrics get the .Name() from the task
* unit-tests for the metric file

Fixes #3121
  • Loading branch information
karampok committed Oct 11, 2019
1 parent c865360 commit 8b963a7
Show file tree
Hide file tree
Showing 30 changed files with 540 additions and 165 deletions.
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/originator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (cfg OriginatorConf) New() (*Originator, error) {

// Name returns the tasks name.
func (o *Originator) Name() string {
return "beaconing.Originator"
return "bs_beaconing_originator"
}

// Run originates core and downstream beacons.
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cfg PropagatorConf) New() (*Propagator, error) {

// Name returns the tasks name.
func (p *Propagator) Name() string {
return "beaconing.Propagator"
return "bs_beaconing_propagator"
}

// Run propagates beacons provided by the beacon provider on all active target
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (cfg RegistrarConf) New() (*Registrar, error) {

// Name returns the tasks name.
func (r *Registrar) Name() string {
return "beaconing.Registrar"
return "bs_beaconing_registrar"
}

// Run registers path segments for the specified type to path servers.
Expand Down
4 changes: 2 additions & 2 deletions go/beacon_srv/internal/beaconstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ type Store interface {
func NewBeaconCleaner(s Store) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return s.DeleteExpiredBeacons(ctx)
}, "beacons")
}, "bs_beacon")
}

// NewRevocationCleaner creates a cleaner task, which deletes expired revocations.
func NewRevocationCleaner(s Store) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return s.DeleteExpiredRevocations(ctx)
}, "revocations")
}, "bs_revocation")
}
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/ifstate/revoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cfg RevokerConf) New() *Revoker {

// Name returns the tasks name.
func (r *Revoker) Name() string {
return "ifstate.Revoker"
return "bs_ifstate_revoker"
}

// Run issues revocations for interfaces that have timed out
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/keepalive/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Sender struct {

// Name returns the tasks name.
func (s *Sender) Name() string {
return "keepalive.Sender"
return "bs_keepalive_sender"
}

// Run sends ifid keepalive messages on all border routers.
Expand Down
21 changes: 10 additions & 11 deletions go/beacon_srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,13 @@ func (t *periodicTasks) Start() error {
if t.propagator, err = t.startPropagator(topoAddress); err != nil {
return err
}
t.beaconCleaner = periodic.StartPeriodicTask(

t.beaconCleaner = periodic.Start(
beaconstorage.NewBeaconCleaner(t.store),
periodic.NewTicker(30*time.Second), 30*time.Second,
)
t.revCleaner = periodic.StartPeriodicTask(
30*time.Second, 30*time.Second)
t.revCleaner = periodic.Start(
beaconstorage.NewRevocationCleaner(t.store),
periodic.NewTicker(5*time.Second), 5*time.Second,
)
5*time.Second, 5*time.Second)
return nil
}

Expand All @@ -339,7 +338,7 @@ func (t *periodicTasks) startRevoker() (*periodic.Runner, error) {
RevOverlap: cfg.BS.RevOverlap.Duration,
},
}.New()
return periodic.StartPeriodicTask(r, periodic.NewTicker(cfg.BS.ExpiredCheckInterval.Duration),
return periodic.Start(r, cfg.BS.ExpiredCheckInterval.Duration,
cfg.BS.ExpiredCheckInterval.Duration), nil
}

Expand All @@ -354,7 +353,7 @@ func (t *periodicTasks) startKeepaliveSender(a *topology.TopoAddr) (*periodic.Ru
Signer: infra.NullSigner,
TopoProvider: t.topoProvider,
}
return periodic.StartPeriodicTask(s, periodic.NewTicker(cfg.BS.KeepaliveInterval.Duration),
return periodic.Start(s, cfg.BS.KeepaliveInterval.Duration,
cfg.BS.KeepaliveInterval.Duration), nil
}

Expand Down Expand Up @@ -390,7 +389,7 @@ func (t *periodicTasks) startOriginator(a *topology.TopoAddr) (*periodic.Runner,
if err != nil {
return nil, common.NewBasicError("Unable to start originator", err)
}
return periodic.StartPeriodicTask(s, periodic.NewTicker(500*time.Millisecond),
return periodic.Start(s, 500*time.Millisecond,
cfg.BS.OriginationInterval.Duration), nil
}

Expand Down Expand Up @@ -426,7 +425,7 @@ func (t *periodicTasks) startPropagator(a *topology.TopoAddr) (*periodic.Runner,
if err != nil {
return nil, common.NewBasicError("Unable to start propagator", err)
}
return periodic.StartPeriodicTask(p, periodic.NewTicker(500*time.Millisecond),
return periodic.Start(p, 500*time.Millisecond,
cfg.BS.PropagationInterval.Duration), nil
}

Expand Down Expand Up @@ -476,7 +475,7 @@ func (t *periodicTasks) startRegistrar(topo *topology.Topo, segType proto.PathSe
if err != nil {
return nil, common.NewBasicError("Unable to start registrar", err, "type", segType)
}
return periodic.StartPeriodicTask(r, periodic.NewTicker(500*time.Millisecond),
return periodic.Start(r, 500*time.Millisecond,
cfg.BS.RegistrationInterval.Duration), nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/cert_srv/internal/reiss/corepush.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type CorePusher struct {

// Name returns the tasks name.
func (p *CorePusher) Name() string {
return "reiss.CorePusher"
return "cs_reiss_corepusher"
}

// Run makes sure all core CS have the chain of the local AS.
Expand Down
12 changes: 6 additions & 6 deletions go/cert_srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func realMain() int {
// Non-core starts a requester.
func startReissRunner() {
if !cfg.CS.DisableCorePush {
corePusher = periodic.StartPeriodicTask(
corePusher = periodic.Start(
&reiss.CorePusher{
LocalIA: itopo.Get().ISD_AS,
TrustDB: state.TrustDB,
Msger: msgr,
},
periodic.NewTicker(time.Hour),
time.Hour,
time.Minute,
)
corePusher.TriggerRun()
Expand All @@ -125,7 +125,7 @@ func startReissRunner() {
}
if itopo.Get().Core {
log.Info("Starting periodic reiss.Self task")
reissRunner = periodic.StartPeriodicTask(
reissRunner = periodic.Start(
&reiss.Self{
Msgr: msgr,
State: state,
Expand All @@ -134,21 +134,21 @@ func startReissRunner() {
LeafTime: cfg.CS.LeafReissueLeadTime.Duration,
CorePusher: corePusher,
},
periodic.NewTicker(cfg.CS.ReissueRate.Duration),
cfg.CS.ReissueRate.Duration,
cfg.CS.ReissueTimeout.Duration,
)
return
}
log.Info("Starting periodic reiss.Requester task")
reissRunner = periodic.StartPeriodicTask(
reissRunner = periodic.Start(
&reiss.Requester{
Msgr: msgr,
State: state,
IA: itopo.Get().ISD_AS,
LeafTime: cfg.CS.LeafReissueLeadTime.Duration,
CorePusher: corePusher,
},
periodic.NewTicker(cfg.CS.ReissueRate.Duration),
cfg.CS.ReissueRate.Duration,
cfg.CS.ReissueTimeout.Duration,
)
}
Expand Down
3 changes: 1 addition & 2 deletions go/examples/discovery_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func realMain() int {
return 1
}
log.Info("Starting periodic fetching", "period", *period)
ticker := periodic.NewTicker(*period)
runner := periodic.StartPeriodicTask(fetcher, ticker, *timeout)
runner := periodic.Start(fetcher, *period, *timeout)
defer runner.Stop()
select {}
}
Expand Down
6 changes: 3 additions & 3 deletions go/lib/healthpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewPool(infos map[Info]struct{}, opts PoolOptions) (*Pool, error) {
if err := p.Update(infos); err != nil {
return nil, err
}
p.expirer = periodic.StartPeriodicTask((*expirer)(p), periodic.NewTicker(time.Second),
time.Second)
//TODO(karampok). Before we start the task, we need caller identifier (e.g. bs_healthpool_x)
p.expirer = periodic.Start((*expirer)(p), time.Second, time.Second)
return p, nil
}

Expand Down Expand Up @@ -130,7 +130,7 @@ func (p *Pool) chooseMinFails() (Info, error) {
type expirer Pool

func (e *expirer) Name() string {
return "healthpool.expirer"
return "healthpool_expirer"
}

func (e *expirer) Run(_ context.Context) {
Expand Down
19 changes: 9 additions & 10 deletions go/lib/infra/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
)

const (
// MetricsNamespace is the namespace under which metrics are published for
// the cleaner.
MetricsNamespace = "cleaner"
// metricSubsystem is the subsystem under which metrics are published for the cleaner.
metricSubsystem = "cleaner"
)

var registry = metricsRegistry{registered: make(map[string]*metric)}
Expand Down Expand Up @@ -57,7 +56,7 @@ func New(deleter ExpiredDeleter, subsystem string) *Cleaner {

// Name returns the tasks name.
func (c *Cleaner) Name() string {
return fmt.Sprintf("Cleaner for %s", c.subsystem)
return fmt.Sprintf("%s_cleaner", c.subsystem)
}

// Run deletes expired entries using the deleter func.
Expand All @@ -81,19 +80,19 @@ type metricsRegistry struct {
registered map[string]*metric
}

func (m *metricsRegistry) register(subsystem string) *metric {
func (m *metricsRegistry) register(namespace string) *metric {
m.mu.Lock()
defer m.mu.Unlock()
if metric, ok := m.registered[subsystem]; ok {
if metric, ok := m.registered[namespace]; ok {
return metric
}
m.registered[subsystem] = &metric{
resultsTotal: *prom.NewCounterVec(MetricsNamespace, subsystem, "results_total",
m.registered[namespace] = &metric{
resultsTotal: *prom.NewCounterVec(namespace, metricSubsystem, "results_total",
"Results of running the cleaner, either ok or err", []string{"result"}),
deletedTotal: prom.NewCounter(MetricsNamespace, subsystem, "deleted_total",
deletedTotal: prom.NewCounter(namespace, metricSubsystem, "deleted_total",
"Number of deleted entries total."),
}
return m.registered[subsystem]
return m.registered[namespace]
}

type metric struct {
Expand Down
2 changes: 1 addition & 1 deletion go/lib/infra/modules/idiscovery/idiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (r *Runner) startRegularFetcher(fetcher *task, cfg FetchConfig) {
if r.stopping {
return
}
r.fetcher = periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration),
r.fetcher = periodic.Start(fetcher, cfg.Interval.Duration,
cfg.Timeout.Duration)
}

Expand Down
6 changes: 3 additions & 3 deletions go/lib/infra/modules/itopo/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
var _ periodic.Task = cleaner{}

// StartCleaner starts a periodic task that removes expired dynamic topologies.
func StartCleaner(tick, timeout time.Duration) *periodic.Runner {
return periodic.StartPeriodicTask(cleaner{}, periodic.NewTicker(tick), timeout)
func StartCleaner(period, timeout time.Duration) *periodic.Runner {
return periodic.Start(cleaner{}, period, timeout)
}

type cleaner struct{}

// Name returns the tasks name.
func (c cleaner) Name() string {
return "itopo.cleaner"
return "itopo_cleaner"
}

// Run deletes expired dynamic topologies and calls the dropFunc passed to Init.
Expand Down
4 changes: 2 additions & 2 deletions go/lib/pathdb/pathdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type PathDB interface {
}

// NewCleaner creates a cleaner task that deletes expired segments.
func NewCleaner(db PathDB) *cleaner.Cleaner {
func NewCleaner(db PathDB, namespace string) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return db.DeleteExpired(ctx, time.Now())
}, "segments")
}, namespace)
}
6 changes: 3 additions & 3 deletions go/lib/pathmgr/polling_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ type PollingParameters struct {
}

func StartPeriodic(params PollingParameters, ch chan sciond.PathReqFlags) *periodic.Runner {
return periodic.StartPeriodicTask(
return periodic.Start(
&taskPeriodicChannelWriter{ch: ch, flags: params.flags},
periodic.NewTicker(params.interval),
params.interval,
time.Hour, // Effectively forever, as the task is short and can never block
)
}
Expand All @@ -122,7 +122,7 @@ type taskPeriodicChannelWriter struct {
}

func (task *taskPeriodicChannelWriter) Name() string {
return "pathmgr.taskPeriodicChannelWriter"
return "pathmgr_channelwriter"
}

func (task *taskPeriodicChannelWriter) Run(_ context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion go/lib/periodic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//go/lib/log:go_default_library",
"//go/lib/periodic/metrics:go_default_library",
"//go/lib/util:go_default_library",
],
)
Expand All @@ -16,7 +17,7 @@ go_test(
srcs = ["periodic_test.go"],
embed = [":go_default_library"],
deps = [
"//go/lib/xtest:go_default_library",
"//go/lib/log:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
25 changes: 25 additions & 0 deletions go/lib/periodic/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importpath = "github.com/scionproto/scion/go/lib/periodic/metrics",
visibility = ["//visibility:public"],
deps = [
"//go/lib/prom:go_default_library",
"@com_github_iancoleman_strcase//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["metrics_test.go"],
embed = [":go_default_library"],
deps = [
"//go/lib/prom/promtest:go_default_library",
"@com_github_prometheus_client_golang//prometheus/testutil:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
Loading

0 comments on commit 8b963a7

Please sign in to comment.