Skip to content

Commit

Permalink
Metrics for snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
Juliusan committed Aug 16, 2023
1 parent feed26a commit 5aa85a7
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 3 deletions.
10 changes: 10 additions & 0 deletions packages/chain/statemanager/sm_snapshots/snapshot_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/runtime/ioutils"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/shutdown"
"github.com/iotaledger/wasp/packages/state"
"github.com/iotaledger/wasp/packages/util"
Expand All @@ -34,6 +35,7 @@ type snapshotManagerImpl struct {
log *logger.Logger
ctx context.Context
chainID isc.ChainID
metrics *metrics.ChainSnapshotsMetrics

lastIndexSnapshotted uint32
lastIndexSnapshottedMutex sync.Mutex
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewSnapshotManager(
baseLocalPath string,
baseNetworkPaths []string,
store state.Store,
metrics *metrics.ChainSnapshotsMetrics,
log *logger.Logger,
) (SnapshotManager, error) {
chainIDString := chainID.String()
Expand All @@ -86,6 +89,7 @@ func NewSnapshotManager(
log: snapMLog,
ctx: ctx,
chainID: chainID,
metrics: metrics,
lastIndexSnapshotted: 0,
lastIndexSnapshottedMutex: sync.Mutex{},
createPeriod: createPeriod,
Expand Down Expand Up @@ -134,13 +138,15 @@ func (smiT *snapshotManagerImpl) createSnapshotsNeeded() bool {
}

func (smiT *snapshotManagerImpl) handleUpdate() {
start := time.Now()
result := shrinkingmap.New[uint32, *util.SliceStruct[*commitmentSources]]()
smiT.handleUpdateLocal(result)
smiT.handleUpdateNetwork(result)

smiT.availableSnapshotsMutex.Lock()
smiT.availableSnapshots = result
smiT.availableSnapshotsMutex.Unlock()
smiT.metrics.SnapshotsUpdated(time.Since(start))
}

// Snapshot manager makes snapshot of every `period`th state only, if this state hasn't
Expand All @@ -152,6 +158,7 @@ func (smiT *snapshotManagerImpl) handleUpdate() {
// that another go routine is already making a snapshot and returns. For this reason
// it is important to delete all temporary files on snapshot manager start.
func (smiT *snapshotManagerImpl) handleBlockCommitted(snapshotInfo SnapshotInfo) {
start := time.Now()
stateIndex := snapshotInfo.StateIndex()
var lastIndexSnapshotted uint32
smiT.lastIndexSnapshottedMutex.Lock()
Expand Down Expand Up @@ -198,11 +205,13 @@ func (smiT *snapshotManagerImpl) handleBlockCommitted(snapshotInfo SnapshotInfo)
}
smiT.lastIndexSnapshottedMutex.Unlock()
smiT.log.Infof("Creating snapshot %v %s: snapshot created in %s", stateIndex, commitment, finalFilePath)
smiT.metrics.SnapshotCreated(time.Since(start), stateIndex)
}()
}
}

func (smiT *snapshotManagerImpl) handleLoadSnapshot(snapshotInfo SnapshotInfo, callback chan<- error) {
start := time.Now()
smiT.log.Debugf("Loading snapshot %s", snapshotInfo)
// smiT.availableSnapshotsMutex.RLock() // Probably locking is not needed as it happens on the same thread as editing available snapshots
commitments, exists := smiT.availableSnapshots.Get(snapshotInfo.StateIndex())
Expand Down Expand Up @@ -262,6 +271,7 @@ func (smiT *snapshotManagerImpl) handleLoadSnapshot(snapshotInfo SnapshotInfo, c
if e == nil {
smiT.log.Debugf("Loading snapshot %s succeeded", snapshotInfo)
callback <- nil
smiT.metrics.SnapshotLoaded(time.Since(start))
return
}
smiT.log.Errorf("Loading snapshot %s: %v", snapshotInfo, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/iotaledger/hive.go/runtime/ioutils"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/state"
"github.com/iotaledger/wasp/packages/testutil/testlogger"
)
Expand All @@ -25,7 +26,7 @@ const localSnapshotsPathConst = "testSnapshots"

func TestSnapshotManagerLocal(t *testing.T) {
createFun := func(chainID isc.ChainID, store state.Store, log *logger.Logger) SnapshotManager {
snapshotManager, err := NewSnapshotManager(context.Background(), nil, chainID, 0, localSnapshotsPathConst, []string{}, store, log)
snapshotManager, err := NewSnapshotManager(context.Background(), nil, chainID, 0, localSnapshotsPathConst, []string{}, store, mockSnapshotsMetrics(), log)
require.NoError(t, err)
return snapshotManager
}
Expand All @@ -47,7 +48,7 @@ func TestSnapshotManagerNetwork(t *testing.T) {

createFun := func(chainID isc.ChainID, store state.Store, log *logger.Logger) SnapshotManager {
networkPaths := []string{"http://localhost" + port + "/"}
snapshotManager, err := NewSnapshotManager(context.Background(), nil, chainID, 0, "nonexistent", networkPaths, store, log)
snapshotManager, err := NewSnapshotManager(context.Background(), nil, chainID, 0, "nonexistent", networkPaths, store, mockSnapshotsMetrics(), log)
require.NoError(t, err)
return snapshotManager
}
Expand Down Expand Up @@ -82,7 +83,7 @@ func testSnapshotManagerSimple(
factory := sm_gpa_utils.NewBlockFactory(t)
blocks := factory.GetBlocks(numberOfBlocks, 1)
storeOrig := factory.GetStore()
snapshotManagerOrig, err := NewSnapshotManager(context.Background(), nil, factory.GetChainID(), uint32(snapshotCreatePeriod), localSnapshotsPathConst, []string{}, storeOrig, log)
snapshotManagerOrig, err := NewSnapshotManager(context.Background(), nil, factory.GetChainID(), uint32(snapshotCreatePeriod), localSnapshotsPathConst, []string{}, storeOrig, mockSnapshotsMetrics(), log)
require.NoError(t, err)

// "Running" node, making snapshots
Expand Down Expand Up @@ -165,3 +166,7 @@ func cleanupAfterTest(t *testing.T) {
err := os.RemoveAll(localSnapshotsPathConst)
require.NoError(t, err)
}

func mockSnapshotsMetrics() *metrics.ChainSnapshotsMetrics {
return metrics.NewChainMetricsProvider().GetChainMetrics(isc.EmptyChainID()).Snapshots
}
1 change: 1 addition & 0 deletions packages/chains/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error { //nolint:fu
c.snapshotFolderPath,
c.snapshotNetworkPaths,
chainStore,
chainMetrics.Snapshots,
chainLog,
)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions packages/metrics/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ChainMetrics struct {
Mempool *ChainMempoolMetrics
Message *ChainMessageMetrics
StateManager *ChainStateManagerMetrics
Snapshots *ChainSnapshotsMetrics
NodeConn *ChainNodeConnMetrics
WebAPI *ChainWebAPIMetrics
State *ChainStateMetrics
Expand All @@ -33,6 +34,7 @@ type ChainMetricsProvider struct {
Mempool *ChainMempoolMetricsProvider
Message *ChainMessageMetricsProvider
StateManager *ChainStateManagerMetricsProvider
Snapshots *ChainSnapshotsMetricsProvider
NodeConn *ChainNodeConnMetricsProvider
WebAPI *ChainWebAPIMetricsProvider
State *ChainStateMetricsProvider
Expand All @@ -48,6 +50,7 @@ func NewChainMetricsProvider() *ChainMetricsProvider {
Mempool: newChainMempoolMetricsProvider(),
Message: newChainMessageMetricsProvider(),
StateManager: newChainStateManagerMetricsProvider(),
Snapshots: newChainSnapshotsMetricsProvider(),
NodeConn: newChainNodeConnMetricsProvider(),
WebAPI: newChainWebAPIMetricsProvider(),
State: newChainStateMetricsProvider(),
Expand All @@ -61,6 +64,7 @@ func (m *ChainMetricsProvider) Register(reg prometheus.Registerer) {
m.Mempool.register(reg)
m.Message.register(reg)
m.StateManager.register(reg)
m.Snapshots.register(reg)
m.NodeConn.register(reg)
m.WebAPI.register(reg)
m.State.register(reg)
Expand All @@ -80,6 +84,7 @@ func (m *ChainMetricsProvider) GetChainMetrics(chainID isc.ChainID) *ChainMetric
Mempool: m.Mempool.createForChain(chainID),
Message: m.Message.createForChain(chainID),
StateManager: m.StateManager.createForChain(chainID),
Snapshots: m.Snapshots.createForChain(chainID),
NodeConn: m.NodeConn.createForChain(chainID),
WebAPI: m.WebAPI.createForChain(chainID),
State: m.State.createForChain(chainID),
Expand Down
102 changes: 102 additions & 0 deletions packages/metrics/chain_snapshots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/wasp/packages/isc"
)

type ChainSnapshotsMetricsProvider struct {
created *countAndMaxMetrics
updateDuration *prometheus.HistogramVec
createDuration *prometheus.HistogramVec
loadDuration *prometheus.HistogramVec
}

func newChainSnapshotsMetricsProvider() *ChainSnapshotsMetricsProvider {
return &ChainSnapshotsMetricsProvider{
created: newCountAndMaxMetrics(
prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "iota_wasp",
Subsystem: "snapshots",
Name: "created",
Help: "Total number of snapshots created",
}, []string{labelNameChain}),
prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "iota_wasp",
Subsystem: "snapshots",
Name: "max_index",
Help: "Largest index of created snapshot",
}, []string{labelNameChain}),
),
updateDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "iota_wasp",
Subsystem: "snapshots",
Name: "update_duration",
Help: "The duration (s) of updating available snashots list",
Buckets: execTimeBuckets,
}, []string{labelNameChain}),
createDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "iota_wasp",
Subsystem: "snapshots",
Name: "create_duration",
Help: "The duration (s) of creating snapshot and storing it in file system",
Buckets: execTimeBuckets,
}, []string{labelNameChain}),
loadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "iota_wasp",
Subsystem: "snapshots",
Name: "load_duration",
Help: "The duration (s) of (down)loading snapshot an writing it into the store",
Buckets: execTimeBuckets,
}, []string{labelNameChain}),
}
}

func (p *ChainSnapshotsMetricsProvider) register(reg prometheus.Registerer) {
reg.MustRegister(
p.updateDuration,
p.createDuration,
p.loadDuration,
)
reg.MustRegister(p.created.collectors()...)
}

func (p *ChainSnapshotsMetricsProvider) createForChain(chainID isc.ChainID) *ChainSnapshotsMetrics {
return newChainSnapshotsMetrics(p, chainID)
}

type ChainSnapshotsMetrics struct {
labels prometheus.Labels
collectors *ChainSnapshotsMetricsProvider
}

func newChainSnapshotsMetrics(collectors *ChainSnapshotsMetricsProvider, chainID isc.ChainID) *ChainSnapshotsMetrics {
labels := getChainLabels(chainID)

// init values so they appear in prometheus
collectors.created.with(labels)
collectors.updateDuration.With(labels)
collectors.createDuration.With(labels)
collectors.loadDuration.With(labels)

return &ChainSnapshotsMetrics{
collectors: collectors,
labels: labels,
}
}

func (m *ChainSnapshotsMetrics) SnapshotsUpdated(duration time.Duration) {
m.collectors.updateDuration.With(m.labels).Observe(duration.Seconds())
}

func (m *ChainSnapshotsMetrics) SnapshotCreated(duration time.Duration, stateIndex uint32) {
m.collectors.createDuration.With(m.labels).Observe(duration.Seconds())
m.collectors.created.countValue(m.labels, float64(stateIndex))
}

func (m *ChainSnapshotsMetrics) SnapshotLoaded(duration time.Duration) {
m.collectors.loadDuration.With(m.labels).Observe(duration.Seconds())
}

0 comments on commit 5aa85a7

Please sign in to comment.