Skip to content

Commit

Permalink
Declare metrics at raft balloon level
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Apr 5, 2019
1 parent 79ae579 commit 1d61f47
Show file tree
Hide file tree
Showing 5 changed files with 1,157 additions and 55 deletions.
92 changes: 92 additions & 0 deletions raftwal/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package raftwal

import "github.com/prometheus/client_golang/prometheus"

// namespace is the leading part of all published metrics.
const namespace = "qed"

// subsystem associated with metrics for raft balloon
const subSystem = "raft_balloon"

type raftBalloonMetrics struct {
Version prometheus.GaugeFunc
Adds prometheus.Counter
MembershipQueries prometheus.Counter
DigestMembershipQueries prometheus.Counter
IncrementalQueries prometheus.Counter
}

func newRaftBalloonMetrics(b *RaftBalloon) *raftBalloonMetrics {
return &raftBalloonMetrics{
Version: prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "version",
Help: "Current balloon version.",
},
func() float64 {
return float64(b.fsm.balloon.Version())
},
),
Adds: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "adds",
Help: "Number of add operations",
},
),
MembershipQueries: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "membership_queries",
Help: "Number of membership queries.",
},
),
DigestMembershipQueries: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "digest_membership_queries",
Help: "Number of membership by digest queries.",
},
),
IncrementalQueries: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subSystem,
Name: "incremental_queries",
Help: "Number of incremental queries.",
},
),
}
}

// collectors satisfies the prom.PrometheusCollector interface.
func (m *raftBalloonMetrics) collectors() []prometheus.Collector {
return []prometheus.Collector{
m.Version,
m.Adds,
m.MembershipQueries,
m.DigestMembershipQueries,
m.IncrementalQueries,
}
}
18 changes: 17 additions & 1 deletion raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/bbva/qed/balloon"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/raftwal/commands"
"github.com/bbva/qed/raftwal/raftrocks"
Expand Down Expand Up @@ -90,13 +91,14 @@ type RaftBalloon struct {
fsm *BalloonFSM // balloon's finite state machine
snapshotsCh chan *protocol.Snapshot // channel to publish snapshots

metrics *raftBalloonMetrics
}

// NewRaftBalloon returns a new RaftBalloon.
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) {

// Create the log store and stable store
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true, EnableStatistics: true})
if err != nil {
return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err)
}
Expand Down Expand Up @@ -128,6 +130,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshots
rb.store.db = store
rb.store.log = logStore
rb.store.rocksStore = rocksStore
rb.metrics = newRaftBalloonMetrics(rb)

return rb, nil
}
Expand Down Expand Up @@ -233,9 +236,11 @@ func (b *RaftBalloon) Close(wait bool) error {

b.store.rocksStore = nil
b.store.log = nil
b.metrics = nil

// Close FSM
b.fsm.Close()
b.fsm = nil

// close database
if err := b.store.db.Close(); err != nil {
Expand Down Expand Up @@ -369,6 +374,7 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
if err != nil {
return nil, err
}
b.metrics.Adds.Inc()
snapshot := resp.(*fsmAddResponse).snapshot

//Send snapshot to the snapshot channel
Expand All @@ -383,14 +389,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
}

func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) {
b.metrics.DigestMembershipQueries.Inc()
return b.fsm.QueryDigestMembership(keyDigest, version)
}

func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) {
b.metrics.MembershipQueries.Inc()
return b.fsm.QueryMembership(event, version)
}

func (b *RaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) {
b.metrics.IncrementalQueries.Inc()
return b.fsm.QueryConsistency(start, end)
}

Expand Down Expand Up @@ -467,3 +476,10 @@ func (b *RaftBalloon) Info() map[string]interface{} {
m["meta"] = b.fsm.meta
return m
}

func (b *RaftBalloon) RegisterMetrics(registry metrics.Registry) {
if registry != nil {
b.store.rocksStore.RegisterMetrics(registry)
}
registry.MustRegister(b.metrics.collectors()...)
}
29 changes: 18 additions & 11 deletions raftwal/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/bbva/qed/log"
"github.com/bbva/qed/storage/rocks"
metrics_utils "github.com/bbva/qed/testutils/metrics"
utilrand "github.com/bbva/qed/testutils/rand"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -449,34 +450,40 @@ func mustTempDir() string {
}

func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)
storePath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id)

err := os.MkdirAll(rocksdbPath, os.FileMode(0755))
err := os.MkdirAll(storePath, os.FileMode(0755))
require.NoError(b, err)
rocksdb, err := rocks.NewRocksDBStore(rocksdbPath)
store, err := rocks.NewRocksDBStore(storePath)
require.NoError(b, err)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(b, err)

snapshotsCh := make(chan *protocol.Snapshot, 10000)
startSnapshotsDrainer(snapshotsCh)
//defer close(snapshotsCh)
snapshotsDrainer(snapshotsCh)

r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, snapshotsCh)
node, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), store, snapshotsCh)
require.NoError(b, err)

return r, func() {
srvCloseF := metrics_utils.StartMetricsServer(node, store)

return node, func() {
srvCloseF()
close(snapshotsCh)
os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id))
}

}

func startSnapshotsDrainer(snapshotsCh chan *protocol.Snapshot) {
func snapshotsDrainer(snapshotsCh chan *protocol.Snapshot) {
go func() {
for range snapshotsCh {

for {
_, ok := <-snapshotsCh
if !ok {
return
}
}
}()
}
Expand All @@ -491,7 +498,7 @@ func BenchmarkRaftAdd(b *testing.B) {
err := raftNode.Open(true, map[string]string{"foo": "bar"})
require.NoError(b, err)

// b.N shoul be eq or greater than 500k to avoid benchmark framework spreding more than one goroutine.
// b.N shoul be eq or greater than 500k to avoid benchmark framework spreading more than one goroutine.
b.N = 2000000
b.ResetTimer()
b.SetParallelism(100)
Expand Down
Loading

0 comments on commit 1d61f47

Please sign in to comment.