Skip to content

Commit

Permalink
Implement feedback
Browse files Browse the repository at this point in the history
* Change StartTask() to Start()
* Refactor periodic test not to be racy
* Make changes that cleaner will carry the
namespace of the process that called it
  • Loading branch information
karampok committed Oct 10, 2019
1 parent 32b4b37 commit fec6cf6
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 126 deletions.
4 changes: 2 additions & 2 deletions go/beacon_srv/internal/beaconstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ type Store interface {
func NewBeaconCleaner(s Store) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return s.DeleteExpiredBeacons(ctx)
}, "beacons")
}, "bs_beacon")
}

// NewRevocationCleaner creates a cleaner task, which deletes expired revocations.
func NewRevocationCleaner(s Store) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return s.DeleteExpiredRevocations(ctx)
}, "revocations")
}, "bs_revocation")
}
14 changes: 7 additions & 7 deletions go/beacon_srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ func (t *periodicTasks) Start() error {
return err
}

t.beaconCleaner = periodic.StartTask(
t.beaconCleaner = periodic.Start(
beaconstorage.NewBeaconCleaner(t.store),
30*time.Second, 30*time.Second)
t.revCleaner = periodic.StartTask(
t.revCleaner = periodic.Start(
beaconstorage.NewRevocationCleaner(t.store),
5*time.Second, 5*time.Second)
return nil
Expand All @@ -338,7 +338,7 @@ func (t *periodicTasks) startRevoker() (*periodic.Runner, error) {
RevOverlap: cfg.BS.RevOverlap.Duration,
},
}.New()
return periodic.StartTask(r, cfg.BS.ExpiredCheckInterval.Duration,
return periodic.Start(r, cfg.BS.ExpiredCheckInterval.Duration,
cfg.BS.ExpiredCheckInterval.Duration), nil
}

Expand All @@ -353,7 +353,7 @@ func (t *periodicTasks) startKeepaliveSender(a *topology.TopoAddr) (*periodic.Ru
Signer: infra.NullSigner,
TopoProvider: t.topoProvider,
}
return periodic.StartTask(s, cfg.BS.KeepaliveInterval.Duration,
return periodic.Start(s, cfg.BS.KeepaliveInterval.Duration,
cfg.BS.KeepaliveInterval.Duration), nil
}

Expand Down Expand Up @@ -389,7 +389,7 @@ func (t *periodicTasks) startOriginator(a *topology.TopoAddr) (*periodic.Runner,
if err != nil {
return nil, common.NewBasicError("Unable to start originator", err)
}
return periodic.StartTask(s, 500*time.Millisecond,
return periodic.Start(s, 500*time.Millisecond,
cfg.BS.OriginationInterval.Duration), nil
}

Expand Down Expand Up @@ -425,7 +425,7 @@ func (t *periodicTasks) startPropagator(a *topology.TopoAddr) (*periodic.Runner,
if err != nil {
return nil, common.NewBasicError("Unable to start propagator", err)
}
return periodic.StartTask(p, 500*time.Millisecond,
return periodic.Start(p, 500*time.Millisecond,
cfg.BS.PropagationInterval.Duration), nil
}

Expand Down Expand Up @@ -475,7 +475,7 @@ func (t *periodicTasks) startRegistrar(topo *topology.Topo, segType proto.PathSe
if err != nil {
return nil, common.NewBasicError("Unable to start registrar", err, "type", segType)
}
return periodic.StartTask(r, 500*time.Millisecond,
return periodic.Start(r, 500*time.Millisecond,
cfg.BS.RegistrationInterval.Duration), nil
}

Expand Down
6 changes: 3 additions & 3 deletions go/cert_srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func realMain() int {
// Non-core starts a requester.
func startReissRunner() {
if !cfg.CS.DisableCorePush {
corePusher = periodic.StartTask(
corePusher = periodic.Start(
&reiss.CorePusher{
LocalIA: itopo.Get().ISD_AS,
TrustDB: state.TrustDB,
Expand All @@ -125,7 +125,7 @@ func startReissRunner() {
}
if itopo.Get().Core {
log.Info("Starting periodic reiss.Self task")
reissRunner = periodic.StartTask(
reissRunner = periodic.Start(
&reiss.Self{
Msgr: msgr,
State: state,
Expand All @@ -140,7 +140,7 @@ func startReissRunner() {
return
}
log.Info("Starting periodic reiss.Requester task")
reissRunner = periodic.StartTask(
reissRunner = periodic.Start(
&reiss.Requester{
Msgr: msgr,
State: state,
Expand Down
2 changes: 1 addition & 1 deletion go/examples/discovery_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func realMain() int {
return 1
}
log.Info("Starting periodic fetching", "period", *period)
runner := periodic.StartTask(fetcher, *period, *timeout)
runner := periodic.Start(fetcher, *period, *timeout)
defer runner.Stop()
select {}
}
Expand Down
3 changes: 2 additions & 1 deletion go/lib/healthpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func NewPool(infos map[Info]struct{}, opts PoolOptions) (*Pool, error) {
if err := p.Update(infos); err != nil {
return nil, err
}
p.expirer = periodic.StartTask((*expirer)(p), time.Second, time.Second)
//TODO(karampok). Before we start the task, we need caller identifier (e.g. bs_healthpool_x)
p.expirer = periodic.Start((*expirer)(p), time.Second, time.Second)
return p, nil
}

Expand Down
6 changes: 3 additions & 3 deletions go/lib/infra/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New(deleter ExpiredDeleter, subsystem string) *Cleaner {

// Name returns the tasks name.
func (c *Cleaner) Name() string {
return fmt.Sprintf("cleaner_%s", c.subsystem)
return fmt.Sprintf("%s_cleaner", c.subsystem)
}

// Run deletes expired entries using the deleter func.
Expand Down Expand Up @@ -88,9 +88,9 @@ func (m *metricsRegistry) register(subsystem string) *metric {
return metric
}
m.registered[subsystem] = &metric{
resultsTotal: *prom.NewCounterVec(MetricsNamespace, subsystem, "results_total",
resultsTotal: *prom.NewCounterVec(subsystem, MetricsNamespace, "results_total",
"Results of running the cleaner, either ok or err", []string{"result"}),
deletedTotal: prom.NewCounter(MetricsNamespace, subsystem, "deleted_total",
deletedTotal: prom.NewCounter(subsystem, MetricsNamespace, "deleted_total",
"Number of deleted entries total."),
}
return m.registered[subsystem]
Expand Down
2 changes: 1 addition & 1 deletion go/lib/infra/modules/idiscovery/idiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (r *Runner) startRegularFetcher(fetcher *task, cfg FetchConfig) {
if r.stopping {
return
}
r.fetcher = periodic.StartTask(fetcher, cfg.Interval.Duration,
r.fetcher = periodic.Start(fetcher, cfg.Interval.Duration,
cfg.Timeout.Duration)
}

Expand Down
4 changes: 2 additions & 2 deletions go/lib/infra/modules/itopo/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ var _ periodic.Task = cleaner{}

// StartCleaner starts a periodic task that removes expired dynamic topologies.
func StartCleaner(period, timeout time.Duration) *periodic.Runner {
return periodic.StartTask(cleaner{}, period, timeout)
return periodic.Start(cleaner{}, period, timeout)
}

type cleaner struct{}

// Name returns the tasks name.
func (c cleaner) Name() string {
return "itopo.cleaner"
return "itopo_cleaner"
}

// Run deletes expired dynamic topologies and calls the dropFunc passed to Init.
Expand Down
2 changes: 1 addition & 1 deletion go/lib/pathmgr/polling_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type PollingParameters struct {
}

func StartPeriodic(params PollingParameters, ch chan sciond.PathReqFlags) *periodic.Runner {
return periodic.StartTask(
return periodic.Start(
&taskPeriodicChannelWriter{ch: ch, flags: params.flags},
params.interval,
time.Hour, // Effectively forever, as the task is short and can never block
Expand Down
6 changes: 3 additions & 3 deletions go/lib/periodic/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
)

const (
//EventStop indicates a stop event took place.
// EventStop indicates a stop event took place.
EventStop = "stop"
//EventKill indicates a kill event took place.
// EventKill indicates a kill event took place.
EventKill = "kill"
//EventTrigger indicates a trigger event took place.
// EventTrigger indicates a trigger event took place.
EventTrigger = "triggered"
)

Expand Down
32 changes: 16 additions & 16 deletions go/lib/periodic/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"fmt"
"strings"
"testing"
"time"
Expand All @@ -26,26 +27,15 @@ import (
)

func TestLabels(t *testing.T) {
tests := []interface{}{
EventLabels{},
}
for _, test := range tests {
promtest.CheckLabelsStruct(t, test)
}
promtest.CheckLabelsStruct(t, EventLabels{})
}

func TestNewMetric(t *testing.T) {
t.Run("Happy path", func(t *testing.T) {
t.Run("Returns valid exporter", func(t *testing.T) {
t.Parallel()
n, sn := "randomSnakeName", "random_snake_name"
assert.NotContains(t, counters, sn)
rnd := fmt.Sprintf("%v", time.Now().Unix())
n, sn := "randomSnakeName"+rnd, "random_snake_name_"+rnd
x := NewMetric(n)
assert.Contains(t, counters, sn)
y := NewMetric(n)
assert.Equal(t, x, y) // same prefix, same singleton exporter
z := NewMetric(n + "z")
assert.NotEqual(t, z, y) // different prefix, not same exporter

_, ok := x.(ExportMetric)
assert.True(t, ok)

Expand All @@ -56,7 +46,17 @@ func TestNewMetric(t *testing.T) {
assert.NotNil(t, v.timestamp)
})

t.Run("Invalid metric name", func(t *testing.T) {
t.Run("Same name does not panic", func(t *testing.T) {
t.Parallel()
n := "randomSnakeNameOne"
NewMetric(n)
w := func() {
NewMetric(n)
}
require.NotPanics(t, w)
})

t.Run("Invalid name does not panic", func(t *testing.T) {
t.Parallel()
n := "random.SnakeName"
w := func() {
Expand Down
78 changes: 29 additions & 49 deletions go/lib/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@ import (
"github.com/scionproto/scion/go/lib/util"
)

// Ticker interface to improve testability of this periodic task code.
type Ticker interface {
Chan() <-chan time.Time
Stop()
}

type defaultTicker struct {
*time.Ticker
}

func (t *defaultTicker) Chan() <-chan time.Time {
return t.C
}

func newTicker(d time.Duration) Ticker {
return &defaultTicker{
Ticker: time.NewTicker(d),
}
}

// A Task that has to be periodically executed.
type Task interface {
// Run executes the task once, it should return within the context's timeout.
Expand All @@ -54,39 +34,39 @@ type Task interface {

// Runner runs a task periodically.
type Runner struct {
task Task
ticker Ticker
timeout time.Duration
stop chan struct{}
loopFinished chan struct{}
ctx context.Context
cancelF context.CancelFunc
trigger chan struct{}
export metrics.ExportMetric
task Task
ticker *time.Ticker
timeout time.Duration
stop chan struct{}
loopFinished chan struct{}
ctx context.Context
cancelF context.CancelFunc
trigger chan struct{}
metricExporter metrics.ExportMetric
}

// StartTask creates and starts a new Runner to run the given task peridiocally.
// The ticker regulates the periodicity. The timeout is used for the context timeout of the task.
// The timeout can be larger than the periodicity of the ticker. That means if a tasks takes a long
// Start creates and starts a new Runner to run the given task peridiocally.
// The timeout is used for the context timeout of the task. The timeout can be
// larger than the periodicity of the task. That means if a tasks takes a long
// time it will be immediately retriggered.
func StartTask(task Task, period, timeout time.Duration) *Runner {
func Start(task Task, period, timeout time.Duration) *Runner {
ctx, cancelF := context.WithCancel(context.Background())
logger := log.New("debug_id", util.GetDebugID())
ctx = log.CtxWith(ctx, logger)
r := &Runner{
task: task,
ticker: newTicker(period),
timeout: timeout,
stop: make(chan struct{}),
loopFinished: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
trigger: make(chan struct{}),
export: metrics.NewMetric(task.Name()),
task: task,
ticker: time.NewTicker(period),
timeout: timeout,
stop: make(chan struct{}),
loopFinished: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
trigger: make(chan struct{}),
metricExporter: metrics.NewMetric(task.Name()),
}
logger.Info("Starting periodic task", "task", task.Name())
r.export.Period(period)
r.export.StartTimestamp(time.Now())
r.metricExporter.Period(period)
r.metricExporter.StartTimestamp(time.Now())
go func() {
defer log.LogPanicAndExit()
r.runLoop()
Expand All @@ -100,7 +80,7 @@ func (r *Runner) Stop() {
r.ticker.Stop()
close(r.stop)
<-r.loopFinished
r.export.Event(metrics.EventStop)
r.metricExporter.Event(metrics.EventStop)
}

// Kill is like stop but it also cancels the context of the current running method.
Expand All @@ -109,7 +89,7 @@ func (r *Runner) Kill() {
close(r.stop)
r.cancelF()
<-r.loopFinished
r.export.Event(metrics.EventKill)
r.metricExporter.Event(metrics.EventKill)
}

// TriggerRun triggers the periodic task to run now.
Expand All @@ -125,7 +105,7 @@ func (r *Runner) TriggerRun() {
case <-r.stop:
case r.trigger <- struct{}{}:
}
r.export.Event(metrics.EventTrigger)
r.metricExporter.Event(metrics.EventTrigger)
}

func (r *Runner) runLoop() {
Expand All @@ -135,7 +115,7 @@ func (r *Runner) runLoop() {
select {
case <-r.stop:
return
case <-r.ticker.Chan():
case <-r.ticker.C:
r.onTick()
case <-r.trigger:
r.onTick()
Expand All @@ -153,7 +133,7 @@ func (r *Runner) onTick() {
ctx, cancelF := context.WithTimeout(r.ctx, r.timeout)
start := time.Now()
r.task.Run(ctx)
r.export.Runtime(time.Since(start))
r.metricExporter.Runtime(time.Since(start))
cancelF()
}
}
Loading

0 comments on commit fec6cf6

Please sign in to comment.