Skip to content

Commit

Permalink
Fix: use metrics counters
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 11, 2019
1 parent 8fb9d65 commit 85b4389
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
5 changes: 5 additions & 0 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func runAgentAuditor(cmd *cobra.Command, args []string) error {
defer bp.Stop()

agent.Start()

QedAuditorInstancesCount.Inc()

util.AwaitTermSignal(agent.Shutdown)
return nil
}
Expand All @@ -149,6 +152,8 @@ func (i membershipFactory) New(ctx context.Context) gossip.Task {

s := b.Snapshots[0]

QedAuditorBatchesReceivedTotal.Inc()

return func() error {
timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds)
defer timer.ObserveDuration()
Expand Down
5 changes: 5 additions & 0 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func runAgentMonitor(cmd *cobra.Command, args []string) error {
defer bp.Stop()

agent.Start()

QedMonitorInstancesCount.Inc()

util.AwaitTermSignal(agent.Shutdown)
return nil
}
Expand Down Expand Up @@ -225,6 +228,8 @@ func (l *lagFactory) New(ctx context.Context) gossip.Task {
counter := atomic.AddUint64(&l.counter, uint64(len(b.Snapshots)))
lastVersion := atomic.LoadUint64(&l.lastVersion)

QedMonitorBatchesReceivedTotal.Inc()

return func() error {
timer := prometheus.NewTimer(QedMonitorBatchesProcessSeconds)
defer timer.ObserveDuration()
Expand Down
6 changes: 5 additions & 1 deletion server/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type Sender struct {
}

func NewSender(a *gossip.Agent, s sign.Signer, size, ttl, n int) *Sender {
QedSenderInstancesCount.Inc()
return &Sender{
agent: a,
Interval: 100 * time.Millisecond,
Expand All @@ -71,6 +70,7 @@ func NewSender(a *gossip.Agent, s sign.Signer, size, ttl, n int) *Sender {
// Start NumSenders concurrent senders and waits for them
// to finish
func (s Sender) Start(ch chan *protocol.Snapshot) {
QedSenderInstancesCount.Inc()
for i := 0; i < s.NumSenders; i++ {
log.Debugf("Starting sender %d", i)
go s.batcher(i, ch)
Expand Down Expand Up @@ -107,11 +107,14 @@ func (s Sender) batcher(id int, ch chan *protocol.Snapshot) {
log.Infof("Error encoding batch, dropping it")
continue
}

s.agent.Out.Publish(&gossip.Message{
Kind: gossip.BatchMessageType,
TTL: s.TTL,
Payload: payload,
})
QedSenderBatchesSentTotal.Inc()

batch = s.newBatch()
}
ss, err := s.doSign(snap)
Expand All @@ -133,6 +136,7 @@ func (s Sender) batcher(id int, ch chan *protocol.Snapshot) {
TTL: s.TTL,
Payload: payload,
})
QedSenderBatchesSentTotal.Inc()
batch = s.newBatch()
}
case <-s.quitCh:
Expand Down

0 comments on commit 85b4389

Please sign in to comment.