Skip to content

Commit

Permalink
Implement feedback, make count 2 test work
Browse files Browse the repository at this point in the history
  • Loading branch information
karampok committed Oct 11, 2019
1 parent 922fb18 commit c69f362
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 126 deletions.
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/originator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (cfg OriginatorConf) New() (*Originator, error) {

// Name returns the tasks name.
func (o *Originator) Name() string {
return "beaconing_originator"
return "bs_beaconing_originator"
}

// Run originates core and downstream beacons.
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cfg PropagatorConf) New() (*Propagator, error) {

// Name returns the tasks name.
func (p *Propagator) Name() string {
return "beaconing_propagator"
return "bs_beaconing_propagator"
}

// Run propagates beacons provided by the beacon provider on all active target
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/ifstate/revoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cfg RevokerConf) New() *Revoker {

// Name returns the tasks name.
func (r *Revoker) Name() string {
return "ifstate_revoker"
return "bs_ifstate_revoker"
}

// Run issues revocations for interfaces that have timed out
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/keepalive/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Sender struct {

// Name returns the tasks name.
func (s *Sender) Name() string {
return "keepalive_sender"
return "bs_keepalive_sender"
}

// Run sends ifid keepalive messages on all border routers.
Expand Down
16 changes: 8 additions & 8 deletions go/lib/infra/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
)

const (
// MetricsNamespace is the namespace under which metrics are published for
// metricSubsystem is the namespace under which metrics are published for
// the cleaner.
MetricsNamespace = "cleaner"
metricSubsystem = "cleaner"
)

var registry = metricsRegistry{registered: make(map[string]*metric)}
Expand Down Expand Up @@ -81,19 +81,19 @@ type metricsRegistry struct {
registered map[string]*metric
}

func (m *metricsRegistry) register(subsystem string) *metric {
func (m *metricsRegistry) register(namespace string) *metric {
m.mu.Lock()
defer m.mu.Unlock()
if metric, ok := m.registered[subsystem]; ok {
if metric, ok := m.registered[namespace]; ok {
return metric
}
m.registered[subsystem] = &metric{
resultsTotal: *prom.NewCounterVec(subsystem, MetricsNamespace, "results_total",
m.registered[namespace] = &metric{
resultsTotal: *prom.NewCounterVec(namespace, metricSubsystem, "results_total",
"Results of running the cleaner, either ok or err", []string{"result"}),
deletedTotal: prom.NewCounter(subsystem, MetricsNamespace, "deleted_total",
deletedTotal: prom.NewCounter(namespace, metricSubsystem, "deleted_total",
"Number of deleted entries total."),
}
return m.registered[subsystem]
return m.registered[namespace]
}

type metric struct {
Expand Down
4 changes: 2 additions & 2 deletions go/lib/pathdb/pathdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type PathDB interface {
}

// NewCleaner creates a cleaner task that deletes expired segments.
func NewCleaner(db PathDB) *cleaner.Cleaner {
func NewCleaner(db PathDB, namespace string) *cleaner.Cleaner {
return cleaner.New(func(ctx context.Context) (int, error) {
return db.DeleteExpired(ctx, time.Now())
}, "segments")
}, namespace)
}
3 changes: 0 additions & 3 deletions go/lib/periodic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//go/lib/log:go_default_library",
"//go/lib/periodic/metrics:go_default_library",
"//go/lib/periodic/metrics/mock_metrics:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
2 changes: 1 addition & 1 deletion go/lib/periodic/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (e exporter) StartTimestamp(t time.Time) {
}

func (e exporter) Period(d time.Duration) {
e.period.Set(float64(d) / 1e9)
e.period.Set(d.Seconds())
}

func (e exporter) Runtime(d time.Duration) {
Expand Down
91 changes: 55 additions & 36 deletions go/lib/periodic/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ func TestLabels(t *testing.T) {
func TestNewMetric(t *testing.T) {
t.Run("Returns valid exporter", func(t *testing.T) {
rnd := fmt.Sprintf("%v", time.Now().Unix())
n, sn := "randomSnakeName"+rnd, "random_snake_name_"+rnd
x := NewMetric(n)
_, ok := x.(ExportMetric)
assert.True(t, ok)

v, _ := counters[sn]
assert.NotNil(t, v.period)
assert.NotNil(t, v.events)
assert.NotNil(t, v.runtime)
assert.NotNil(t, v.timestamp)
n := "randomSnakeName" + rnd
w := func() {
x := NewMetric(n)
x.Period(time.Second)
x.Runtime(time.Second)
x.Event("dummy")
x.StartTimestamp(time.Now())
}
require.NotPanics(t, w)
})

t.Run("Same name does not panic", func(t *testing.T) {
n := "randomSnakeNameOne"
n := "randomOtherName"
NewMetric(n)
w := func() {
NewMetric(n)
Expand All @@ -56,7 +55,7 @@ func TestNewMetric(t *testing.T) {
})

t.Run("Invalid name does not panic", func(t *testing.T) {
n := "random.SnakeName"
n := "random.NameWithDot"
w := func() {
NewMetric(n)
}
Expand All @@ -65,50 +64,70 @@ func TestNewMetric(t *testing.T) {
}

func TestContent(t *testing.T) {
m := NewMetric("testMe")
v, ok := counters["test_me"]
assert.True(t, ok)

t.Run("Runtime", func(t *testing.T) {
want := `
# HELP test_me_periodic_runtime_duration_seconds_total Total time spend on every periodic run.
# TYPE test_me_periodic_runtime_duration_seconds_total counter
test_me_periodic_runtime_duration_seconds_total 1
`
rnd := fmt.Sprintf("%v", time.Now().Nanosecond())
n, sn := "randomName"+rnd, "random_name_"+rnd
m := NewMetric(n)
v, ok := counters[sn]
assert.True(t, ok)

want := fmt.Sprintf(`
# HELP %s_periodic_runtime_duration_seconds_total Total time spend on every periodic run.
# TYPE %s_periodic_runtime_duration_seconds_total counter
%s_periodic_runtime_duration_seconds_total 1
`, sn, sn, sn)
m.Runtime(1 * time.Second)
err := testutil.CollectAndCompare(v.runtime, strings.NewReader(want))
assert.NoError(t, err)
})

t.Run("StartTimestamp", func(t *testing.T) {
want := `
# HELP test_me_periodic_runtime_timestamp_seconds The unix timestamp when the periodic run started.
# TYPE test_me_periodic_runtime_timestamp_seconds gauge
test_me_periodic_runtime_timestamp_seconds 1.570633374e+09
`
rnd := fmt.Sprintf("%v", time.Now().Nanosecond())
n, sn := "randomName"+rnd, "random_name_"+rnd
m := NewMetric(n)
v, ok := counters[sn]
assert.True(t, ok)

want := fmt.Sprintf(`
# HELP %s_periodic_runtime_timestamp_seconds The unix timestamp when the periodic run started.
# TYPE %s_periodic_runtime_timestamp_seconds gauge
%s_periodic_runtime_timestamp_seconds 1.570633374e+09
`, sn, sn, sn)
ts := time.Unix(1570633374, 0)
m.StartTimestamp(ts)
err := testutil.CollectAndCompare(v.timestamp, strings.NewReader(want))
assert.NoError(t, err)
})

t.Run("Event", func(t *testing.T) {
want := `
# HELP test_me_periodic_event_total Total number of events.
# TYPE test_me_periodic_event_total counter
test_me_periodic_event_total{event_type="kill"} 1
`
rnd := fmt.Sprintf("%v", time.Now().Nanosecond())
n, sn := "randomName"+rnd, "random_name_"+rnd
m := NewMetric(n)
v, ok := counters[sn]
assert.True(t, ok)

want := fmt.Sprintf(`
# HELP %s_periodic_event_total Total number of events.
# TYPE %s_periodic_event_total counter
%s_periodic_event_total{event_type="kill"} 1
`, sn, sn, sn)
m.Event(EventKill)
err := testutil.CollectAndCompare(v.events, strings.NewReader(want))
assert.NoError(t, err)
})

t.Run("Period", func(t *testing.T) {
want := `
# HELP test_me_periodic_period_duration_seconds The period of this job.
# TYPE test_me_periodic_period_duration_seconds gauge
test_me_periodic_period_duration_seconds 0.02
`
rnd := fmt.Sprintf("%v", time.Now().Nanosecond())
n, sn := "randomName"+rnd, "random_name_"+rnd
m := NewMetric(n)
v, ok := counters[sn]
assert.True(t, ok)

want := fmt.Sprintf(`
# HELP %s_periodic_period_duration_seconds The period of this job.
# TYPE %s_periodic_period_duration_seconds gauge
%s_periodic_period_duration_seconds 0.02
`, sn, sn, sn)
m.Period(20 * time.Millisecond)
err := testutil.CollectAndCompare(v.period, strings.NewReader(want))
assert.NoError(t, err)
Expand Down
51 changes: 27 additions & 24 deletions go/lib/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ type Task interface {

// Runner runs a task periodically.
type Runner struct {
task Task
ticker *time.Ticker
timeout time.Duration
stop chan struct{}
loopFinished chan struct{}
ctx context.Context
cancelF context.CancelFunc
trigger chan struct{}
metricExporter metrics.ExportMetric
task Task
ticker *time.Ticker
timeout time.Duration
stop chan struct{}
loopFinished chan struct{}
ctx context.Context
cancelF context.CancelFunc
trigger chan struct{}
metric metrics.ExportMetric
}

// Start creates and starts a new Runner to run the given task peridiocally.
Expand All @@ -54,19 +54,19 @@ func Start(task Task, period, timeout time.Duration) *Runner {
logger := log.New("debug_id", util.GetDebugID())
ctx = log.CtxWith(ctx, logger)
r := &Runner{
task: task,
ticker: time.NewTicker(period),
timeout: timeout,
stop: make(chan struct{}),
loopFinished: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
trigger: make(chan struct{}),
metricExporter: metrics.NewMetric(task.Name()),
task: task,
ticker: time.NewTicker(period),
timeout: timeout,
stop: make(chan struct{}),
loopFinished: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
trigger: make(chan struct{}),
metric: metrics.NewMetric(task.Name()),
}
logger.Info("Starting periodic task", "task", task.Name())
r.metricExporter.Period(period)
r.metricExporter.StartTimestamp(time.Now())
r.metric.Period(period)
r.metric.StartTimestamp(time.Now())
go func() {
defer log.LogPanicAndExit()
r.runLoop()
Expand All @@ -80,16 +80,19 @@ func (r *Runner) Stop() {
r.ticker.Stop()
close(r.stop)
<-r.loopFinished
r.metricExporter.Event(metrics.EventStop)
r.metric.Event(metrics.EventStop)
}

// Kill is like stop but it also cancels the context of the current running method.
func (r *Runner) Kill() {
if r == nil {
return
}
r.ticker.Stop()
close(r.stop)
r.cancelF()
<-r.loopFinished
r.metricExporter.Event(metrics.EventKill)
r.metric.Event(metrics.EventKill)
}

// TriggerRun triggers the periodic task to run now.
Expand All @@ -105,7 +108,7 @@ func (r *Runner) TriggerRun() {
case <-r.stop:
case r.trigger <- struct{}{}:
}
r.metricExporter.Event(metrics.EventTrigger)
r.metric.Event(metrics.EventTrigger)
}

func (r *Runner) runLoop() {
Expand Down Expand Up @@ -133,7 +136,7 @@ func (r *Runner) onTick() {
ctx, cancelF := context.WithTimeout(r.ctx, r.timeout)
start := time.Now()
r.task.Run(ctx)
r.metricExporter.Runtime(time.Since(start))
r.metric.Runtime(time.Since(start))
cancelF()
}
}
Loading

0 comments on commit c69f362

Please sign in to comment.