-
Notifications
You must be signed in to change notification settings - Fork 12
/
handlers.go
118 lines (98 loc) · 3.07 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"github.com/hailocab/ctop/types"
"fmt"
"github.com/nsf/termbox-go"
)
// Logging to a channel (from anywhere):
func logToChannel(severity string, message string) {
// Make a new LogMessage struct:
logMessage := types.LogMessage{
Severity: severity,
Message: message,
}
// Put it in the messages channel:
select {
case messageChannel <- logMessage:
default:
}
}
// Takes metrics off the channel and adds them up:
func handleMetrics() {
var cfStats types.CFStats
for {
// Get a metric from the channel:
cfMetric := <-metricsChannel
logToChannel("debug", fmt.Sprintf("Received a metric! %s", cfMetric.MetricName))
// Build the key:
statName := cfMetric.KeySpace + ":" + cfMetric.ColumnFamily
statsMutex.Lock()
defer statsMutex.Unlock()
// See if we already have a stats-entry:
if _, ok := stats[statName]; ok {
// Use the existing stats-entry:
logToChannel("debug", fmt.Sprintf("Updating existing stat (%s)", statName))
cfStats = stats[statName]
} else {
// Add a new entry to the map:
logToChannel("debug", fmt.Sprintf("Adding new stat (%s)", statName))
cfStats = types.CFStats{
ReadCount: 0,
ReadCountTS: 0,
ReadLatency: 0.0,
ReadRate: 0.0,
WriteCount: 0,
WriteCountTS: 0,
WriteLatency: 0.0,
WriteRate: 0.0,
KeySpace: cfMetric.KeySpace,
ColumnFamily: cfMetric.ColumnFamily,
}
}
// Figure out which metric we need to update:
if cfMetric.MetricName == "ReadCount" {
// Total read count:
interval := cfMetric.MetricTimeStamp - cfStats.ReadCountTS
if cfStats.ReadCountTS == 0 {
cfStats.ReadRate = 0.0
} else {
cfStats.ReadRate = float64(cfMetric.MetricIntValue-cfStats.ReadCount) / float64(interval)
}
cfStats.ReadCount = cfMetric.MetricIntValue
cfStats.ReadCountTS = cfMetric.MetricTimeStamp
stats[statName] = cfStats
} else if cfMetric.MetricName == "WriteCount" {
// Total write count:
interval := cfMetric.MetricTimeStamp - cfStats.WriteCountTS
if cfStats.WriteCountTS == 0 {
cfStats.WriteRate = 0.0
} else {
cfStats.WriteRate = float64(cfMetric.MetricIntValue-cfStats.WriteCount) / float64(interval)
}
cfStats.WriteCount = cfMetric.MetricIntValue
cfStats.WriteCountTS = cfMetric.MetricTimeStamp
stats[statName] = cfStats
} else if cfMetric.MetricName == "LiveDiskSpaceUsed" {
// Total disk space used(k):
cfStats.LiveDiskSpaceUsed = cfMetric.MetricIntValue
stats[statName] = cfStats
} else if cfMetric.MetricName == "RecentReadLatencyMicros" {
// ReadLatency (MicroSeconds):
if cfMetric.MetricFloatValue > 0 {
cfStats.ReadLatency = cfMetric.MetricFloatValue / 1000
stats[statName] = cfStats
}
} else if cfMetric.MetricName == "RecentWriteLatencyMicros" {
// WriteLatency (MicroSeconds):
if cfMetric.MetricFloatValue > 0 {
cfStats.WriteLatency = cfMetric.MetricFloatValue / 1000
stats[statName] = cfStats
}
}
statsMutex.Unlock()
}
}
// Returns the key-code:
func handleKeypress(ev *termbox.Event) {
logToChannel("debug", fmt.Sprintf("Key pressed: %s", ev.Ch))
}