From 1d61f47937b4107f6d111a1f0a58beab63757e41 Mon Sep 17 00:00:00 2001 From: Alvaro Alda Date: Fri, 5 Apr 2019 19:49:01 +0200 Subject: [PATCH] Declare metrics at raft balloon level --- raftwal/metrics.go | 92 +++ raftwal/raft.go | 18 +- raftwal/raft_test.go | 29 +- raftwal/raftrocks/metrics.go | 945 +++++++++++++++++++++++++++++ raftwal/raftrocks/rocksdb_store.go | 128 ++-- 5 files changed, 1157 insertions(+), 55 deletions(-) create mode 100644 raftwal/metrics.go create mode 100644 raftwal/raftrocks/metrics.go diff --git a/raftwal/metrics.go b/raftwal/metrics.go new file mode 100644 index 000000000..ef0aff7ed --- /dev/null +++ b/raftwal/metrics.go @@ -0,0 +1,92 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package raftwal + +import "github.com/prometheus/client_golang/prometheus" + +// namespace is the leading part of all published metrics. +const namespace = "qed" + +// subsystem associated with metrics for raft balloon +const subSystem = "raft_balloon" + +type raftBalloonMetrics struct { + Version prometheus.GaugeFunc + Adds prometheus.Counter + MembershipQueries prometheus.Counter + DigestMembershipQueries prometheus.Counter + IncrementalQueries prometheus.Counter +} + +func newRaftBalloonMetrics(b *RaftBalloon) *raftBalloonMetrics { + return &raftBalloonMetrics{ + Version: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subSystem, + Name: "version", + Help: "Current balloon version.", + }, + func() float64 { + return float64(b.fsm.balloon.Version()) + }, + ), + Adds: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subSystem, + Name: "adds", + Help: "Number of add operations", + }, + ), + MembershipQueries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subSystem, + Name: "membership_queries", + Help: "Number of membership queries.", + }, + ), + DigestMembershipQueries: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subSystem, + Name: "digest_membership_queries", + Help: "Number of membership by digest queries.", + }, + ), + IncrementalQueries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subSystem, + Name: "incremental_queries", + Help: "Number of incremental queries.", + }, + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *raftBalloonMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.Version, + m.Adds, + m.MembershipQueries, + m.DigestMembershipQueries, + m.IncrementalQueries, + } +} diff --git a/raftwal/raft.go b/raftwal/raft.go index 046bb8cd3..2772af843 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -26,6 +26,7 @@ import ( "github.com/bbva/qed/balloon" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" + "github.com/bbva/qed/metrics" "github.com/bbva/qed/protocol" "github.com/bbva/qed/raftwal/commands" "github.com/bbva/qed/raftwal/raftrocks" @@ -90,13 +91,14 @@ type RaftBalloon struct { fsm *BalloonFSM // balloon's finite state machine snapshotsCh chan *protocol.Snapshot // channel to publish snapshots + metrics *raftBalloonMetrics } // NewRaftBalloon returns a new RaftBalloon. func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) { // Create the log store and stable store - rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true}) + rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true, EnableStatistics: true}) if err != nil { return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err) } @@ -128,6 +130,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshots rb.store.db = store rb.store.log = logStore rb.store.rocksStore = rocksStore + rb.metrics = newRaftBalloonMetrics(rb) return rb, nil } @@ -233,9 +236,11 @@ func (b *RaftBalloon) Close(wait bool) error { b.store.rocksStore = nil b.store.log = nil + b.metrics = nil // Close FSM b.fsm.Close() + b.fsm = nil // close database if err := b.store.db.Close(); err != nil { @@ -369,6 +374,7 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) { if err != nil { return nil, err } + b.metrics.Adds.Inc() snapshot := resp.(*fsmAddResponse).snapshot //Send snapshot to the snapshot channel @@ -383,14 +389,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) { } func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) { + b.metrics.DigestMembershipQueries.Inc() return b.fsm.QueryDigestMembership(keyDigest, version) } func (b *RaftBalloon) QueryMembership(event []byte, version uint64) (*balloon.MembershipProof, error) { + b.metrics.MembershipQueries.Inc() return b.fsm.QueryMembership(event, version) } func (b *RaftBalloon) QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error) { + b.metrics.IncrementalQueries.Inc() return b.fsm.QueryConsistency(start, end) } @@ -467,3 +476,10 @@ func (b *RaftBalloon) Info() map[string]interface{} { m["meta"] = b.fsm.meta return m } + +func (b *RaftBalloon) RegisterMetrics(registry metrics.Registry) { + if registry != nil { + b.store.rocksStore.RegisterMetrics(registry) + } + registry.MustRegister(b.metrics.collectors()...) +} diff --git a/raftwal/raft_test.go b/raftwal/raft_test.go index 412f71afe..e21b430f0 100644 --- a/raftwal/raft_test.go +++ b/raftwal/raft_test.go @@ -31,6 +31,7 @@ import ( "github.com/bbva/qed/log" "github.com/bbva/qed/storage/rocks" + metrics_utils "github.com/bbva/qed/testutils/metrics" utilrand "github.com/bbva/qed/testutils/rand" "github.com/hashicorp/raft" "github.com/stretchr/testify/require" @@ -449,11 +450,11 @@ func mustTempDir() string { } func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) { - rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id) + storePath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id) - err := os.MkdirAll(rocksdbPath, os.FileMode(0755)) + err := os.MkdirAll(storePath, os.FileMode(0755)) require.NoError(b, err) - rocksdb, err := rocks.NewRocksDBStore(rocksdbPath) + store, err := rocks.NewRocksDBStore(storePath) require.NoError(b, err) raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) @@ -461,22 +462,28 @@ func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) { require.NoError(b, err) snapshotsCh := make(chan *protocol.Snapshot, 10000) - startSnapshotsDrainer(snapshotsCh) - //defer close(snapshotsCh) + snapshotsDrainer(snapshotsCh) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, snapshotsCh) + node, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), store, snapshotsCh) require.NoError(b, err) - return r, func() { + srvCloseF := metrics_utils.StartMetricsServer(node, store) + + return node, func() { + srvCloseF() + close(snapshotsCh) os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id)) } } -func startSnapshotsDrainer(snapshotsCh chan *protocol.Snapshot) { +func snapshotsDrainer(snapshotsCh chan *protocol.Snapshot) { go func() { - for range snapshotsCh { - + for { + _, ok := <-snapshotsCh + if !ok { + return + } } }() } @@ -491,7 +498,7 @@ func BenchmarkRaftAdd(b *testing.B) { err := raftNode.Open(true, map[string]string{"foo": "bar"}) require.NoError(b, err) - // b.N shoul be eq or greater than 500k to avoid benchmark framework spreding more than one goroutine. + // b.N shoul be eq or greater than 500k to avoid benchmark framework spreading more than one goroutine. b.N = 2000000 b.ResetTimer() b.SetParallelism(100) diff --git a/raftwal/raftrocks/metrics.go b/raftwal/raftrocks/metrics.go new file mode 100644 index 000000000..8908c7734 --- /dev/null +++ b/raftwal/raftrocks/metrics.go @@ -0,0 +1,945 @@ +/* + Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package raftrocks + +import ( + "fmt" + "strconv" + + "github.com/bbva/qed/rocksdb" + "github.com/prometheus/client_golang/prometheus" +) + +// namespace is the leading part of all published metrics for the Storage service. +const namespace = "qed_wal" + +const blockCacheSubsystem = "block" // sub-system associated with metrics for block cache. +const filterSubsystem = "filter" // sub-system associated with metrics for bloom filters. +const memtableSubsystem = "memtable" // sub-system associated with metrics for memtable. +const getSubsystem = "get" // sub-system associated with metrics for gets. +const ioSubsystem = "io" // sub-system associated with metrics for I/O. +const compressSybsystem = "compress" // sub-system associated with metrics for compression. + +type rocksDBMetrics struct { + *blockCacheMetrics + *bloomFilterMetrics + *memtableMetrics + *getsMetrics + *ioMetrics + *compressMetrics + tables []*perTableMetrics +} + +func newRocksDBMetrics(store *RocksDBStore) *rocksDBMetrics { + tables := make([]*perTableMetrics, 0) + tables = append(tables, newPerTableMetrics(stableTable, store)) + tables = append(tables, newPerTableMetrics(logTable, store)) + return &rocksDBMetrics{ + blockCacheMetrics: newBlockCacheMetrics(store.stats, store.blockCache), + bloomFilterMetrics: newBloomFilterMetrics(store.stats), + memtableMetrics: newMemtableMetrics(store.stats), + getsMetrics: newGetsMetrics(store.stats), + ioMetrics: newIOMetrics(store.stats), + compressMetrics: newCompressMetrics(store.stats), + tables: tables, + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *rocksDBMetrics) collectors() []prometheus.Collector { + var collectors []prometheus.Collector + collectors = append(collectors, m.blockCacheMetrics.collectors()...) + collectors = append(collectors, m.bloomFilterMetrics.collectors()...) + collectors = append(collectors, m.memtableMetrics.collectors()...) + collectors = append(collectors, m.getsMetrics.collectors()...) + collectors = append(collectors, m.ioMetrics.collectors()...) + collectors = append(collectors, m.compressMetrics.collectors()...) + for _, table := range m.tables { + collectors = append(collectors, table.collectors()...) + } + return collectors +} + +// blockCacheMetrics are a set of metrics concerned with the block cache. +type blockCacheMetrics struct { + Miss prometheus.GaugeFunc + Hit prometheus.GaugeFunc + Add prometheus.GaugeFunc + AddFailures prometheus.GaugeFunc + IndexMiss prometheus.GaugeFunc + IndexHit prometheus.GaugeFunc + IndexAdd prometheus.GaugeFunc + IndexBytesInsert prometheus.GaugeFunc + IndexBytesEvict prometheus.GaugeFunc + FilterMiss prometheus.GaugeFunc + FilterHit prometheus.GaugeFunc + FilterAdd prometheus.GaugeFunc + FilterBytesInsert prometheus.GaugeFunc + FilterBytesEvict prometheus.GaugeFunc + DataMiss prometheus.GaugeFunc + DataHit prometheus.GaugeFunc + DataAdd prometheus.GaugeFunc + DataBytesInsert prometheus.GaugeFunc + BytesRead prometheus.GaugeFunc + BytesWrite prometheus.GaugeFunc + Usage prometheus.GaugeFunc + PinnedUsage prometheus.GaugeFunc +} + +// newBlockCacheMetrics initialises the prometheus metris for block cache. +func newBlockCacheMetrics(stats *rocksdb.Statistics, cache *rocksdb.Cache) *blockCacheMetrics { + return &blockCacheMetrics{ + Miss: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_miss", + Help: "Block cache misses.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheMiss), + ), + Hit: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_hit", + Help: "Block cache hits.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheHit), + ), + Add: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_add", + Help: "Block cache adds.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheAdd), + ), + AddFailures: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_add_failures", + Help: "Block cache add failures.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheAddFailures), + ), + IndexMiss: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_index_miss", + Help: "Block cache index misses.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheIndexMiss), + ), + IndexHit: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_index_hit", + Help: "Block cache index hits.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheIndexHit), + ), + IndexAdd: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_index_add", + Help: "Block cache index adds.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheIndexAdd), + ), + IndexBytesInsert: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_index_bytes_insert", + Help: "Block cache index bytes inserted.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheIndexBytesInsert), + ), + IndexBytesEvict: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_index_bytes_evict", + Help: "Block cache index bytes evicted.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheIndexBytesEvict), + ), + FilterMiss: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_filter_miss", + Help: "Block cache filter misses.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheFilterMiss), + ), + FilterHit: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_filter_hit", + Help: "Block cache filter hits.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheFilterHit), + ), + FilterAdd: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_filter_add", + Help: "Block cache filter adds.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheFilterAdd), + ), + FilterBytesInsert: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_filter_bytes_insert", + Help: "Block cache filter bytes inserted.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheFilterBytesInsert), + ), + FilterBytesEvict: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_filter_bytes_evict", + Help: "Block cache filter bytes evicted.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheFilterBytesEvict), + ), + DataMiss: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_data_miss", + Help: "Block cache data misses.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheDataMiss), + ), + DataHit: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_data_hit", + Help: "Block cache data hits.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheDataHit), + ), + DataAdd: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_data_add", + Help: "Block cache data adds.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheDataAdd), + ), + DataBytesInsert: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_data_bytes_insert", + Help: "Block cache data bytes inserted.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheDataBytesInsert), + ), + BytesRead: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_bytes_read", + Help: "Block cache bytes read.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheBytesRead), + ), + BytesWrite: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_bytes_write", + Help: "Block cache bytes written.", + }, + extractMetric(stats, rocksdb.TickerBlockCacheBytesWrite), + ), + Usage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_memory_usage", + Help: "Block cache memory usage.", + }, + func() float64 { + return float64(cache.GetUsage()) + }, + ), + PinnedUsage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blockCacheSubsystem, + Name: "cache_pinned_memory_usage", + Help: "Block cache pinned memory usage.", + }, + func() float64 { + return float64(cache.GetPinnedUsage()) + }, + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *blockCacheMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.Miss, + m.Hit, + m.Add, + m.AddFailures, + m.IndexMiss, + m.IndexHit, + m.IndexAdd, + m.IndexBytesInsert, + m.IndexBytesEvict, + m.FilterMiss, + m.FilterHit, + m.FilterAdd, + m.FilterBytesInsert, + m.FilterBytesEvict, + m.DataMiss, + m.DataHit, + m.DataAdd, + m.DataBytesInsert, + m.BytesRead, + m.BytesWrite, + m.Usage, + m.PinnedUsage, + } +} + +type bloomFilterMetrics struct { + Useful prometheus.GaugeFunc + FullPositive prometheus.GaugeFunc + FullTruePositive prometheus.GaugeFunc +} + +func newBloomFilterMetrics(stats *rocksdb.Statistics) *bloomFilterMetrics { + return &bloomFilterMetrics{ + Useful: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: filterSubsystem, + Name: "useful", + Help: "Number of times bloom filter avoided reads.", + }, + extractMetric(stats, rocksdb.TickerBloomFilterUseful), + ), + FullPositive: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: filterSubsystem, + Name: "full_positive", + Help: "Number of times bloom fullfilter did not avoid reads.", + }, + extractMetric(stats, rocksdb.TickerBloomFilterFullPositive), + ), + FullTruePositive: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: filterSubsystem, + Name: "full_true_positive", + Help: "Number of times bloom fullfilter did not avoid reads and data actually exist.", + }, + extractMetric(stats, rocksdb.TickerBloomFilterFullTruePositive), + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *bloomFilterMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.Useful, + m.FullPositive, + m.FullTruePositive, + } +} + +type memtableMetrics struct { + Hit prometheus.GaugeFunc + Miss prometheus.GaugeFunc +} + +func newMemtableMetrics(stats *rocksdb.Statistics) *memtableMetrics { + return &memtableMetrics{ + Hit: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: memtableSubsystem, + Name: "hit", + Help: "Number of memtable hits.", + }, + extractMetric(stats, rocksdb.TickerMemtableHit), + ), + Miss: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: memtableSubsystem, + Name: "miss", + Help: "Number of memtable misses.", + }, + extractMetric(stats, rocksdb.TickerMemtableMiss), + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *memtableMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.Hit, + m.Miss, + } +} + +type getsMetrics struct { + HitL0 prometheus.GaugeFunc + HitL1 prometheus.GaugeFunc + HitL2AndUp prometheus.GaugeFunc +} + +func newGetsMetrics(stats *rocksdb.Statistics) *getsMetrics { + return &getsMetrics{ + HitL0: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: getSubsystem, + Name: "hits_l0", + Help: "Number of Get() queries server by L0.", + }, + extractMetric(stats, rocksdb.TickerGetHitL0), + ), + HitL1: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: getSubsystem, + Name: "hits_l1", + Help: "Number of Get() queries server by L1.", + }, + extractMetric(stats, rocksdb.TickerGetHitL1), + ), + HitL2AndUp: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: getSubsystem, + Name: "hits_l2_up", + Help: "Number of Get() queries server by L2 and up.", + }, + extractMetric(stats, rocksdb.TickerGetHitL2AndUp), + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *getsMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.HitL0, + m.HitL1, + m.HitL2AndUp, + } +} + +type ioMetrics struct { + KeysWritten prometheus.GaugeFunc + KeysRead prometheus.GaugeFunc + KeysUpdated prometheus.GaugeFunc + BytesRead prometheus.GaugeFunc + BytesWritten prometheus.GaugeFunc + StallMicros prometheus.GaugeFunc + WALFileSynced prometheus.GaugeFunc + WALFileBytes prometheus.GaugeFunc + CompactReadBytes prometheus.GaugeFunc + CompactWriteBytes prometheus.GaugeFunc + FlushWriteBytes prometheus.GaugeFunc +} + +func newIOMetrics(stats *rocksdb.Statistics) *ioMetrics { + return &ioMetrics{ + KeysWritten: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "keys_written", + Help: "Number of keys written via puts and writes.", + }, + extractMetric(stats, rocksdb.TickerNumberKeysWritten), + ), + KeysRead: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "keys_read", + Help: "Number of keys read.", + }, + extractMetric(stats, rocksdb.TickerNumberKeysRead), + ), + KeysUpdated: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "keys_updated", + Help: "Number of keys updated.", + }, + extractMetric(stats, rocksdb.TickerNumberKeysUpdated), + ), + BytesRead: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "bytes_read", + Help: "Number of uncompressed bytes read.", + }, + extractMetric(stats, rocksdb.TickerBytesRead), + ), + BytesWritten: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "bytes_written", + Help: "Number of uncompressed bytes written.", + }, + extractMetric(stats, rocksdb.TickerBytesWritten), + ), + StallMicros: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "stall_micros", + Help: "Number of microseconds waiting for compaction or flush to finish.", + }, + extractMetric(stats, rocksdb.TickerStallMicros), + ), + WALFileSynced: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "wal_files_synced", + Help: "Number of times WAL sync is done.", + }, + extractMetric(stats, rocksdb.TickerWALFileSynced), + ), + WALFileBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "wal_file_bytes", + Help: "Number of bytes written to WAL.", + }, + extractMetric(stats, rocksdb.TickerWALFileBytes), + ), + CompactReadBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "compact_read_bytes", + Help: "Number of bytes read during compaction.", + }, + extractMetric(stats, rocksdb.TickerCompactReadBytes), + ), + CompactWriteBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "compact_write_bytes", + Help: "Number of bytes written during compaction.", + }, + extractMetric(stats, rocksdb.TickerCompactWriteBytes), + ), + FlushWriteBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ioSubsystem, + Name: "compact_flush_bytes", + Help: "Number of bytes written during flush.", + }, + extractMetric(stats, rocksdb.TickerFlushWriteBytes), + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *ioMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.KeysWritten, + m.KeysRead, + m.KeysUpdated, + m.BytesRead, + m.BytesWritten, + m.StallMicros, + m.WALFileSynced, + m.WALFileBytes, + m.CompactReadBytes, + m.CompactWriteBytes, + m.FlushWriteBytes, + } +} + +type compressMetrics struct { + NumberBlockCompressed prometheus.GaugeFunc + NumberBlockDecompressed prometheus.GaugeFunc + NumberBlockNotCompressed prometheus.GaugeFunc +} + +func newCompressMetrics(stats *rocksdb.Statistics) *compressMetrics { + return &compressMetrics{ + NumberBlockCompressed: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: compressSybsystem, + Name: "block_compressed", + Help: "Number of compressions executed", + }, + extractMetric(stats, rocksdb.TickerNumberBlockCompressed), + ), + NumberBlockDecompressed: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: compressSybsystem, + Name: "block_decompressed", + Help: "Number of decompressions executed", + }, + extractMetric(stats, rocksdb.TickerNumberBlockDecompressed), + ), + NumberBlockNotCompressed: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: compressSybsystem, + Name: "block_not_compressed", + Help: "Number of blocks not compressed.", + }, + extractMetric(stats, rocksdb.TickerNumberBlockNotCompressed), + ), + } +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *compressMetrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.NumberBlockCompressed, + m.NumberBlockDecompressed, + m.NumberBlockNotCompressed, + } +} + +type perTableMetrics struct { + NumFilesAtLevelN []prometheus.GaugeFunc + NumImmutableMemtables prometheus.GaugeFunc + NumImmutableMemtablesFlushed prometheus.GaugeFunc + NumRunningFlushes prometheus.GaugeFunc + NumRunningCompactions prometheus.GaugeFunc + CurrentSizeActiveMemtable prometheus.GaugeFunc + CurrentSizeAllMemtables prometheus.GaugeFunc + SizeAllMemtables prometheus.GaugeFunc + NumEntriesActiveMemtable prometheus.GaugeFunc + NumEntriesImmutableMemtables prometheus.GaugeFunc + EstimatedNumKeys prometheus.GaugeFunc + EstimateTableReadersMem prometheus.GaugeFunc + NumLiveVersions prometheus.GaugeFunc + EstimatedLiveDataSize prometheus.GaugeFunc + TotalSSTFilesSize prometheus.GaugeFunc + TotalLiveSSTFilesSize prometheus.GaugeFunc + EstimatedPendingCompactionBytes prometheus.GaugeFunc + ActualDelayedWriteRate prometheus.GaugeFunc + BlockCacheUsage prometheus.GaugeFunc + BlockCachePinnedUsage prometheus.GaugeFunc +} + +func newPerTableMetrics(table table, store *RocksDBStore) *perTableMetrics { + m := &perTableMetrics{ + NumImmutableMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_immutable_memtables", + Help: "Number of immutable memtables that have not yet been flushed.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-immutable-mem-table", store.cfHandles[table])) + }, + ), + NumImmutableMemtablesFlushed: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_immutable_memtables_flushed", + Help: "Number of immutable memtables that have already been flushed.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-immutable-mem-table-flushed", store.cfHandles[table])) + }, + ), + NumRunningFlushes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_running_flushes", + Help: "Number of currently running flushes.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-running-flushes", store.cfHandles[table])) + }, + ), + NumRunningCompactions: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_running_compactions", + Help: "Number of currently running compactions.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-running-compactions", store.cfHandles[table])) + }, + ), + CurrentSizeActiveMemtable: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "cur_size_active_memtable", + Help: "Approximate size of active memtable (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.cur-size-active-mem-table", store.cfHandles[table])) + }, + ), + CurrentSizeAllMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "cur_size_all_memtables", + Help: "Approximate size of active and unflushed immutable memtables (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.cur-size-all-mem-tables", store.cfHandles[table])) + }, + ), + SizeAllMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "size_all_memtables", + Help: "Approximate size of active, unflushed immutable, and pinned immutable memtables (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.size-all-mem-tables", store.cfHandles[table])) + }, + ), + NumEntriesActiveMemtable: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_entries_active_memtable", + Help: "Total number of entries in the active memtable.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-entries-active-mem-table", store.cfHandles[table])) + }, + ), + NumEntriesImmutableMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_entries_imm_memtables", + Help: "Total number of entries in the unflushed immutable memtables.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-entries-imm-mem-tables", store.cfHandles[table])) + }, + ), + EstimatedNumKeys: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_num_keys", + Help: "Estimated number of total keys in the active and unflushed immutable memtables and storage.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-num-keys", store.cfHandles[table])) + }, + ), + EstimateTableReadersMem: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_table_readers_mem", + Help: "Estimated memory used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-table-readers-mem", store.cfHandles[table])) + }, + ), + NumLiveVersions: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_live_versions", + Help: "Number of live versions.", + // A `Version` is an internal data structure. More live versions often mean more SST files + // are held from being deleted, by iterators or unfinished compactions. + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-live-versions", store.cfHandles[table])) + }, + ), + EstimatedLiveDataSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_live_data_size", + Help: "Estimate of the amount of live data (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-live-data-size", store.cfHandles[table])) + }, + ), + TotalSSTFilesSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "total_sst_files_size", + Help: "Total size (bytes) of all SST files.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.total-sst-files-size", store.cfHandles[table])) + }, + ), + TotalLiveSSTFilesSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "total_live_sst_files_size", + Help: "Total size (bytes) of all live SST files that belongs to theh last LSM tree.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.live-sst-files-size", store.cfHandles[table])) + }, + ), + EstimatedPendingCompactionBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_pending_compaction_bytes", + Help: "Estimated total number of bytes compaction needs to rewrite to get all levels down to under target size.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-pending-compaction-bytes", store.cfHandles[table])) + }, + ), + ActualDelayedWriteRate: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "actual_delayed_write_rate", + Help: "Current actual delayed write rate. 0 means no delay.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.actual-delayed-write-rate", store.cfHandles[table])) + }, + ), + BlockCacheUsage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "block_cache_usage", + Help: "Memory size (bytes) for the entries residing in block cache.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.block-cache-usage", store.cfHandles[table])) + }, + ), + BlockCachePinnedUsage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "block_cache_pinned_usage", + Help: "Memory size (bytes) for the entries being pinned in block cache.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.block-cache-pinned-usage", store.cfHandles[table])) + }, + ), + } + numFileAtLevels := make([]prometheus.GaugeFunc, 0) + for i := 0; i <= 5; i++ { + propName := fmt.Sprintf("rocksdb.num-files-at-level%d", i) + numFileAtLevels = append(numFileAtLevels, prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: fmt.Sprintf("num_files_at_level_%d", i), + Help: fmt.Sprintf("Number of files at level %d.", i), + }, + func() float64 { + sValue := store.db.GetPropertyCF(propName, store.cfHandles[table]) + if sValue != "" { + value, _ := strconv.ParseFloat(sValue, 64) + return value + } + return 0.0 + }, + )) + } + m.NumFilesAtLevelN = numFileAtLevels + return m +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *perTableMetrics) collectors() []prometheus.Collector { + c := []prometheus.Collector{ + m.NumImmutableMemtables, + m.NumImmutableMemtablesFlushed, + m.NumRunningFlushes, + m.NumRunningCompactions, + m.CurrentSizeActiveMemtable, + m.CurrentSizeAllMemtables, + m.SizeAllMemtables, + m.NumEntriesActiveMemtable, + m.NumEntriesImmutableMemtables, + m.EstimatedNumKeys, + m.EstimateTableReadersMem, + m.NumLiveVersions, + m.EstimatedLiveDataSize, + m.TotalSSTFilesSize, + m.TotalLiveSSTFilesSize, + m.EstimatedPendingCompactionBytes, + m.ActualDelayedWriteRate, + m.BlockCacheUsage, + m.BlockCachePinnedUsage, + } + for _, metric := range m.NumFilesAtLevelN { + c = append(c, metric) + } + return c +} + +func extractMetric(stats *rocksdb.Statistics, ticker rocksdb.TickerType) func() float64 { + return func() float64 { + return float64(stats.GetAndResetTickerCount(ticker)) + } +} diff --git a/raftwal/raftrocks/rocksdb_store.go b/raftwal/raftrocks/rocksdb_store.go index 5a6f9c96e..6d8228af6 100644 --- a/raftwal/raftrocks/rocksdb_store.go +++ b/raftwal/raftrocks/rocksdb_store.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" + "github.com/bbva/qed/metrics" "github.com/bbva/qed/rocksdb" "github.com/bbva/qed/util" "github.com/hashicorp/go-msgpack/codec" @@ -31,11 +32,27 @@ var ( ErrKeyNotFound = errors.New("not found") ) +// table groups related key-value pairs under a +// consistent space. +type table uint32 + const ( - stableCF string = "stable" - logCF string = "log" + defaultTable table = iota + stableTable + logTable ) +func (t table) String() string { + var s string + switch t { + case stableTable: + s = "stable" + case logTable: + s = "log" + } + return s +} + // RocksDBStore provides access to RocksDB for Raft to store and retrieve // log entries. It also provides key/value storage, and can be used as // a LogStore and StableStore. @@ -43,12 +60,14 @@ type RocksDBStore struct { // db is the underlying handle to the db. db *rocksdb.DB + stats *rocksdb.Statistics + // The path to the RocksDB database directory. - path string - ro *rocksdb.ReadOptions - wo *rocksdb.WriteOptions - stableCFHandle *rocksdb.ColumnFamilyHandle - logCFHandle *rocksdb.ColumnFamilyHandle + path string + ro *rocksdb.ReadOptions + wo *rocksdb.WriteOptions + // column family handlers + cfHandles rocksdb.ColumnFamilyHandles // global options globalOpts *rocksdb.Options @@ -56,9 +75,13 @@ type RocksDBStore struct { stableBbto *rocksdb.BlockBasedTableOptions stableOpts *rocksdb.Options // log options - logBbto *rocksdb.BlockBasedTableOptions - logOpts *rocksdb.Options - logCache *rocksdb.Cache + logBbto *rocksdb.BlockBasedTableOptions + logOpts *rocksdb.Options + // block cache + blockCache *rocksdb.Cache + + // metrics + metrics *rocksDBMetrics } // Options contains all the configuration used to open the RocksDB instance. @@ -71,6 +94,8 @@ type Options struct { // write to the log. This is unsafe, so it should be used // with caution. NoSync bool + + EnableStatistics bool } // NewRocksDBStore takes a file path and returns a connected Raft backend. @@ -85,7 +110,7 @@ func New(options Options) (*RocksDBStore, error) { // we need two column families, one for stable store and one for log store: // stable : used for storing key configurations. // log : used for storing logs in a durable fashion. - cfNames := []string{stableCF, logCF, "default"} + cfNames := []string{"default", stableTable.String(), logTable.String()} defaultOpts := rocksdb.NewDefaultOptions() @@ -93,6 +118,12 @@ func New(options Options) (*RocksDBStore, error) { globalOpts := rocksdb.NewDefaultOptions() globalOpts.SetCreateIfMissing(true) globalOpts.SetCreateIfMissingColumnFamilies(true) + blockCache := rocksdb.NewDefaultLRUCache(512 * 1024 * 1024) + var stats *rocksdb.Statistics + if options.EnableStatistics { + stats = rocksdb.NewStatistics() + globalOpts.SetStatistics(stats) + } // stable store options stableBbto := rocksdb.NewDefaultBlockBasedTableOptions() @@ -103,8 +134,7 @@ func New(options Options) (*RocksDBStore, error) { logBbto := rocksdb.NewDefaultBlockBasedTableOptions() logBbto.SetBlockSize(32 * 1024) logBbto.SetCacheIndexAndFilterBlocks(true) - logCache := rocksdb.NewDefaultLRUCache(512 * 1024 * 1024) - logBbto.SetBlockCache(logCache) + logBbto.SetBlockCache(blockCache) logOpts := rocksdb.NewDefaultOptions() logOpts.SetUseFsync(!options.NoSync) // dio := directIOSupported(options.Path) @@ -146,7 +176,7 @@ func New(options Options) (*RocksDBStore, error) { logOpts.SetMaxBackgroundCompactions(2) logOpts.SetMaxBackgroundFlushes(2) - cfOpts := []*rocksdb.Options{stableOpts, logOpts, defaultOpts} + cfOpts := []*rocksdb.Options{defaultOpts, stableOpts, logOpts} db, cfHandles, err := rocksdb.OpenDBColumnFamilies(options.Path, globalOpts, cfNames, cfOpts) if err != nil { return nil, err @@ -158,29 +188,32 @@ func New(options Options) (*RocksDBStore, error) { ro := rocksdb.NewDefaultReadOptions() ro.SetFillCache(false) - return &RocksDBStore{ - db: db, - path: options.Path, - stableCFHandle: cfHandles[0], - logCFHandle: cfHandles[1], - stableBbto: stableBbto, - stableOpts: stableOpts, - logBbto: logBbto, - logOpts: logOpts, - logCache: logCache, - globalOpts: globalOpts, - ro: ro, - wo: wo, - }, nil + store := &RocksDBStore{ + db: db, + stats: stats, + path: options.Path, + cfHandles: cfHandles, + stableBbto: stableBbto, + stableOpts: stableOpts, + logBbto: logBbto, + logOpts: logOpts, + blockCache: blockCache, + globalOpts: globalOpts, + ro: ro, + wo: wo, + } + + if stats != nil { + store.metrics = newRocksDBMetrics(store) + } + + return store, nil } // Close is used to gracefully close the DB connection. func (s *RocksDBStore) Close() error { - if s.stableCFHandle != nil { - s.stableCFHandle.Destroy() - } - if s.logCFHandle != nil { - s.logCFHandle.Destroy() + for _, cf := range s.cfHandles { + cf.Destroy() } if s.db != nil { s.db.Close() @@ -191,8 +224,8 @@ func (s *RocksDBStore) Close() error { if s.stableOpts != nil { s.stableOpts.Destroy() } - if s.logCache != nil { - s.logCache.Destroy() + if s.blockCache != nil { + s.blockCache.Destroy() } if s.logBbto != nil { s.logBbto.Destroy() @@ -200,6 +233,9 @@ func (s *RocksDBStore) Close() error { if s.logOpts != nil { s.logOpts.Destroy() } + if s.stats != nil { + s.stats.Destroy() + } if s.wo != nil { s.wo.Destroy() } @@ -212,7 +248,7 @@ func (s *RocksDBStore) Close() error { // FirstIndex returns the first known index from the Raft log. func (s *RocksDBStore) FirstIndex() (uint64, error) { - it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle) + it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.cfHandles[logTable]) defer it.Close() it.SeekToFirst() if it.Valid() { @@ -227,7 +263,7 @@ func (s *RocksDBStore) FirstIndex() (uint64, error) { // LastIndex returns the last known index from the Raft log. func (s *RocksDBStore) LastIndex() (uint64, error) { - it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.logCFHandle) + it := s.db.NewIteratorCF(rocksdb.NewDefaultReadOptions(), s.cfHandles[logTable]) defer it.Close() it.SeekToLast() if it.Valid() { @@ -242,7 +278,7 @@ func (s *RocksDBStore) LastIndex() (uint64, error) { // GetLog gets a log entry at a given index. func (s *RocksDBStore) GetLog(index uint64, log *raft.Log) error { - val, err := s.db.GetBytesCF(s.ro, s.logCFHandle, util.Uint64AsBytes(index)) + val, err := s.db.GetBytesCF(s.ro, s.cfHandles[logTable], util.Uint64AsBytes(index)) if err != nil { return err } @@ -258,7 +294,7 @@ func (s *RocksDBStore) StoreLog(log *raft.Log) error { if err != nil { return err } - return s.db.PutCF(s.wo, s.logCFHandle, util.Uint64AsBytes(log.Index), val.Bytes()) + return s.db.PutCF(s.wo, s.cfHandles[logTable], util.Uint64AsBytes(log.Index), val.Bytes()) } // StoreLogs stores a set of raft logs. @@ -270,7 +306,7 @@ func (s *RocksDBStore) StoreLogs(logs []*raft.Log) error { if err != nil { return err } - batch.PutCF(s.logCFHandle, key, val.Bytes()) + batch.PutCF(s.cfHandles[logTable], key, val.Bytes()) } return s.db.Write(s.wo, batch) } @@ -278,13 +314,13 @@ func (s *RocksDBStore) StoreLogs(logs []*raft.Log) error { // DeleteRange deletes logs within a given range inclusively. func (s *RocksDBStore) DeleteRange(min, max uint64) error { batch := rocksdb.NewWriteBatch() - batch.DeleteRangeCF(s.logCFHandle, util.Uint64AsBytes(min), util.Uint64AsBytes(max+1)) + batch.DeleteRangeCF(s.cfHandles[logTable], util.Uint64AsBytes(min), util.Uint64AsBytes(max+1)) return s.db.Write(s.wo, batch) } // Set is used to set a key/value set outside of the raft log. func (s *RocksDBStore) Set(key []byte, val []byte) error { - if err := s.db.PutCF(s.wo, s.stableCFHandle, key, val); err != nil { + if err := s.db.PutCF(s.wo, s.cfHandles[stableTable], key, val); err != nil { return err } return nil @@ -292,7 +328,7 @@ func (s *RocksDBStore) Set(key []byte, val []byte) error { // Get is used to retrieve a value from the k/v store by key func (s *RocksDBStore) Get(key []byte) ([]byte, error) { - val, err := s.db.GetBytesCF(s.ro, s.stableCFHandle, key) + val, err := s.db.GetBytesCF(s.ro, s.cfHandles[stableTable], key) if err != nil { return nil, err } @@ -316,6 +352,12 @@ func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) { return util.BytesAsUint64(val), nil } +func (s *RocksDBStore) RegisterMetrics(registry metrics.Registry) { + if registry != nil { + registry.MustRegister(s.metrics.collectors()...) + } +} + // Decode reverses the encode operation on a byte slice input func decodeMsgPack(buf []byte, out interface{}) error { r := bytes.NewBuffer(buf)