diff --git a/cmd/agent_auditor.go b/cmd/agent_auditor.go index 7f25435e5..659708df6 100644 --- a/cmd/agent_auditor.go +++ b/cmd/agent_auditor.go @@ -128,6 +128,9 @@ func runAgentAuditor(cmd *cobra.Command, args []string) error { defer bp.Stop() agent.Start() + + QedAuditorInstancesCount.Inc() + util.AwaitTermSignal(agent.Shutdown) return nil } @@ -149,6 +152,8 @@ func (i membershipFactory) New(ctx context.Context) gossip.Task { s := b.Snapshots[0] + QedAuditorBatchesReceivedTotal.Inc() + return func() error { timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds) defer timer.ObserveDuration() diff --git a/cmd/agent_monitor.go b/cmd/agent_monitor.go index aedd3570b..84b55c06a 100644 --- a/cmd/agent_monitor.go +++ b/cmd/agent_monitor.go @@ -137,6 +137,9 @@ func runAgentMonitor(cmd *cobra.Command, args []string) error { defer bp.Stop() agent.Start() + + QedMonitorInstancesCount.Inc() + util.AwaitTermSignal(agent.Shutdown) return nil } @@ -225,6 +228,8 @@ func (l *lagFactory) New(ctx context.Context) gossip.Task { counter := atomic.AddUint64(&l.counter, uint64(len(b.Snapshots))) lastVersion := atomic.LoadUint64(&l.lastVersion) + QedMonitorBatchesReceivedTotal.Inc() + return func() error { timer := prometheus.NewTimer(QedMonitorBatchesProcessSeconds) defer timer.ObserveDuration() diff --git a/server/sender.go b/server/sender.go index 7e7e17651..c4657173c 100644 --- a/server/sender.go +++ b/server/sender.go @@ -56,7 +56,6 @@ type Sender struct { } func NewSender(a *gossip.Agent, s sign.Signer, size, ttl, n int) *Sender { - QedSenderInstancesCount.Inc() return &Sender{ agent: a, Interval: 100 * time.Millisecond, @@ -71,6 +70,7 @@ func NewSender(a *gossip.Agent, s sign.Signer, size, ttl, n int) *Sender { // Start NumSenders concurrent senders and waits for them // to finish func (s Sender) Start(ch chan *protocol.Snapshot) { + QedSenderInstancesCount.Inc() for i := 0; i < s.NumSenders; i++ { log.Debugf("Starting sender %d", i) go s.batcher(i, ch) @@ -107,11 +107,14 @@ func (s Sender) batcher(id int, ch chan *protocol.Snapshot) { log.Infof("Error encoding batch, dropping it") continue } + s.agent.Out.Publish(&gossip.Message{ Kind: gossip.BatchMessageType, TTL: s.TTL, Payload: payload, }) + QedSenderBatchesSentTotal.Inc() + batch = s.newBatch() } ss, err := s.doSign(snap) @@ -133,6 +136,7 @@ func (s Sender) batcher(id int, ch chan *protocol.Snapshot) { TTL: s.TTL, Payload: payload, }) + QedSenderBatchesSentTotal.Inc() batch = s.newBatch() } case <-s.quitCh: