Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.ceph): Use perf schema to determine metric type #15233

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 120 additions & 4 deletions plugins/inputs/ceph/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Ceph struct {
GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"`
GatherClusterStats bool `toml:"gather_cluster_stats"`

Log telegraf.Logger `toml:"-"`
Log telegraf.Logger `toml:"-"`
schemaMaps map[socket]perfSchemaMap
}

func (*Ceph) SampleConfig() string {
Expand Down Expand Up @@ -73,7 +74,21 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
return fmt.Errorf("failed to find sockets at path %q: %w", c.SocketDir, err)
}

if c.schemaMaps == nil {
c.schemaMaps = make(map[socket]perfSchemaMap)
}

for _, s := range sockets {
if _, ok := c.schemaMaps[*s]; !ok {
rawSchema, err := perfSchema(c.CephBinary, s)
if err != nil {
c.Log.Warnf("failed to dump perf schema from socket %q: %v", s.socket, err)
} else if schema, err := parseSchema(rawSchema); err != nil {
c.Log.Warnf("failed to parse perf schema from socket %q: %v", s.socket, err)
} else {
c.schemaMaps[*s] = schema
}
}
dump, err := perfDump(c.CephBinary, s)
if err != nil {
acc.AddError(fmt.Errorf("error reading from socket %q: %w", s.socket, err))
Expand All @@ -85,9 +100,25 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
continue
}
for tag, metrics := range data {
acc.AddFields(measurement,
metrics,
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
if schema, ok := c.schemaMaps[*s]; ok {
for name, metric := range metrics {
valueType := schema[tag][name]
switch valueType {
case telegraf.Counter:
acc.AddCounter(measurement,
map[string]interface{}{name: metric},
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
default:
acc.AddGauge(measurement,
map[string]interface{}{name: metric},
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
}
}
} else {
acc.AddFields(measurement,
metrics,
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
}
}
}
return nil
Expand Down Expand Up @@ -136,6 +167,37 @@ func init() {
})
}

// Run ceph perf schema on the passed socket. The output is a JSON string
// mapping collection names to a map of counter names to information.
//
// The counter information includes the type of the counter, which determines
// the names of the final series produced. For example, a real-integer pair
// valued metric produces three series: sum, avgcount and avgtime; which hold
// the sum of all values, the count of all values and the division of these
// values.
func perfSchema(binary string, socket *socket) (string, error) {
cmdArgs := []string{"--admin-daemon", socket.socket}

switch socket.sockType {
case typeOsd, typeMds, typeRgw:
cmdArgs = append(cmdArgs, "perf", "schema")
case typeMon:
cmdArgs = append(cmdArgs, "perfcounters_schema")
default:
return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType)
}

cmd := exec.Command(binary, cmdArgs...)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
return "", fmt.Errorf("error running ceph schema: %w", err)
}

return out.String(), nil
}

var perfDump = func(binary string, socket *socket) (string, error) {
cmdArgs := []string{"--admin-daemon", socket.socket}

Expand Down Expand Up @@ -233,6 +295,60 @@ type metricMap map[string]interface{}

type taggedMetricMap map[string]metricMap

// Mask bits for perf counters
const (
perfCounterNone = 0
perfCounterTime = 0x1
perfCounterU64 = 0x2
perfCounterLongRunAvg = 0x4
perfCounterCounter = 0x8
perfCounterHistogram = 0x10
)

type rawPerfCounter struct {
TypeMask int `json:"type"`
MetricType string `json:"metric_type"`
ValueType string `json:"value_type"`
Description string `json:"description"`
Nick string `json:"nick"`
Priority int `json:"priority"`
Units string `json:"units"`
}

type rawCollection map[string]rawPerfCounter

type perfSchemaMap map[string]map[string]telegraf.ValueType

// Parses the output of ceph perf schema into a useful format, mapping metrics
// in collections to their Telegraf metric type. This is made a little more
// complicated by the need to expand averages into their component metrics.
func parseSchema(rawSchema string) (perfSchemaMap, error) {
rawMap := make(map[string]rawCollection)
err := json.Unmarshal([]byte(rawSchema), &rawMap)
if err != nil {
return nil, fmt.Errorf("failed to parse json: %q: %w", rawSchema, err)
}

schemaMap := make(perfSchemaMap)
for collection, counters := range rawMap {
schemaMap[collection] = make(map[string]telegraf.ValueType)
for counter, schema := range counters {
if schema.TypeMask&perfCounterLongRunAvg != 0 {
schemaMap[collection][counter+".sum"] = telegraf.Counter
schemaMap[collection][counter+".avgcount"] = telegraf.Counter
if schema.TypeMask&perfCounterTime != 0 {
schemaMap[collection][counter+".avgtime"] = telegraf.Gauge
}
} else if schema.TypeMask&perfCounterCounter != 0 {
schemaMap[collection][counter] = telegraf.Counter
} else {
schemaMap[collection][counter] = telegraf.Gauge
}
}
}
return schemaMap, nil
}

// Parses a raw JSON string into a taggedMetricMap
// Delegates the actual parsing to newTaggedMetricMap(..)
func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) {
Expand Down
88 changes: 88 additions & 0 deletions plugins/inputs/ceph/ceph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -120,6 +121,25 @@ func TestGather(t *testing.T) {
require.NoError(t, c.Gather(acc))
}

func TestParseSchema(t *testing.T) {
schemaMap, err := parseSchema(osdRawSchema)

require.NoError(t, err)
// Test Gauge
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op"],
"op should be a Counter")
// Test Counter
require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_wip"],
"op_wip should be a Gauge")
// Test LongRunAvg
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.avgcount"],
"op_latency.avgcount should be a Counter")
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.sum"],
"op_latency.sum should be a Counter")
require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_latency.avgtime"],
"op_latency.avgtime should be a Gauge")
}

func TestFindSockets(t *testing.T) {
tmpdir := t.TempDir()
c := &Ceph{
Expand Down Expand Up @@ -1767,6 +1787,74 @@ var rgwPerfDump = `
}
}
`
var osdRawSchema = `
{ "osd": {
"op_wip": {
"type": 2,
"metric_type": "gauge",
"value_type": "integer",
"description": "Replication operations currently being processed (primary)",
"nick": "",
"priority": 5,
"units": "none"
},
"op": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations",
"nick": "ops",
"priority": 10,
"units": "none"
},
"op_in_bytes": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations total write size",
"nick": "wr",
"priority": 8,
"units": "bytes"
},
"op_out_bytes": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations total read size",
"nick": "rd",
"priority": 8,
"units": "bytes"
},
"op_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (including queue time)",
"nick": "l",
"priority": 9,
"units": "none"
},
"op_process_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (excluding queue time)",
"nick": "",
"priority": 5,
"units": "none"
},
"op_prepare_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (excluding queue time and wait for finished)",
"nick": "",
"priority": 5,
"units": "none"
}
}
}
`

var clusterStatusDump = `
{
Expand Down
Loading