Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.dpdk): Add options to customize error-behavior and metric layout #14308

Merged
merged 16 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 69 additions & 15 deletions plugins/inputs/dpdk/README.md

Large diffs are not rendered by default.

301 changes: 182 additions & 119 deletions plugins/inputs/dpdk/dpdk.go

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions plugins/inputs/dpdk/dpdk_cmds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build linux

package dpdk

import (
"fmt"
"strings"
)

type linkStatus int64

const (
DOWN linkStatus = iota
UP
)

const (
ethdevLinkStatusCommand = "/ethdev/link_status"
linkStatusStringFieldName = "status"
linkStatusIntegerFieldName = "link_status"
)

var (
linkStatusMap = map[string]linkStatus{
"down": DOWN,
"up": UP,
}
)

func processCommandResponse(command string, data map[string]interface{}) error {
if command == ethdevLinkStatusCommand {
return processLinkStatusCmd(data)
}
return nil
}

func processLinkStatusCmd(data map[string]interface{}) error {
status, ok := data[linkStatusStringFieldName].(string)
if !ok {
return fmt.Errorf("can't find or parse %q field", linkStatusStringFieldName)
}

parsedLinkStatus, ok := parseLinkStatus(status)
if !ok {
return fmt.Errorf("can't parse linkStatus: unknown value: %q", status)
}

data[linkStatusIntegerFieldName] = int64(parsedLinkStatus)
return nil
}

func parseLinkStatus(s string) (linkStatus, bool) {
value, ok := linkStatusMap[strings.ToLower(s)]
return value, ok
}
131 changes: 131 additions & 0 deletions plugins/inputs/dpdk/dpdk_cmds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//go:build linux

package dpdk

import (
"fmt"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)

func Test_LinkStatusCommand(t *testing.T) {
t.Run("when 'status' field is DOWN then return 'link_status'=0", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "DOWN"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)

expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "DOWN",
linkStatusIntegerFieldName: int64(0),
},
time.Unix(0, 0),
),
}

actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})

t.Run("when 'status' field is UP then return 'link_status'=1", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "UP"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)

expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "UP",
linkStatusIntegerFieldName: int64(1),
},
time.Unix(0, 0),
),
}

actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})

t.Run("when link status output doesn't have any fields then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{}}`, ethdevLinkStatusCommand)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)

actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, nil, actual, testutil.IgnoreTime())
})

t.Run("when link status output doesn't have status field then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{"tag1": 1}}`, ethdevLinkStatusCommand)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
"tag1": float64(1),
},
time.Unix(0, 0),
),
}

actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})

t.Run("when link status output is invalid then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "BOB"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)

expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "BOB",
},
time.Unix(0, 0),
),
}

actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
}
120 changes: 101 additions & 19 deletions plugins/inputs/dpdk/dpdk_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ import (
"net"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)

const maxInitMessageLength = 1024
const (
maxInitMessageLength = 1024 // based on https://github.com/DPDK/dpdk/blob/v22.07/lib/telemetry/telemetry.c#L352
dpdkSocketTemplateName = "dpdk_telemetry"
)

type initMessage struct {
Version string `json:"version"`
Expand All @@ -21,43 +26,82 @@ type initMessage struct {

type dpdkConnector struct {
pathToSocket string
maxOutputLen uint32
messageShowed bool
accessTimeout time.Duration
connection net.Conn
initMessage *initMessage
}

func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkConnector {
return &dpdkConnector{
pathToSocket: pathToSocket,
messageShowed: false,
accessTimeout: time.Duration(accessTimeout),
}
}

// Connects to the socket
// Since DPDK is a local unix socket, it is instantly returns error or connection, so there's no need to set timeout for it
func (conn *dpdkConnector) connect() (*initMessage, error) {
if err := isSocket(conn.pathToSocket); err != nil {
return nil, err
}

connection, err := net.Dial("unixpacket", conn.pathToSocket)
if err != nil {
return nil, fmt.Errorf("failed to connect to the socket: %w", err)
}

conn.connection = connection
result, err := conn.readMaxOutputLen()

conn.initMessage, err = conn.readInitMessage()
if err != nil {
if closeErr := conn.tryClose(); closeErr != nil {
return nil, fmt.Errorf("%w and failed to close connection: %w", err, closeErr)
}
return nil, err
}
return conn.initMessage, nil
}

// Add metadata fields to data
func (conn *dpdkConnector) addMetadataFields(metadataFields []string, data map[string]interface{}) {
if conn.initMessage == nil {
return
}

for _, field := range metadataFields {
switch field {
case dpdkMetadataFieldPidName:
data[dpdkMetadataFieldPidName] = conn.initMessage.Pid
case dpdkMetadataFieldVersionName:
data[dpdkMetadataFieldVersionName] = conn.initMessage.Version
}
}
}

// Fetches all identifiers of devices and then creates all possible combinations of commands for each device
func (conn *dpdkConnector) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) {
response, err := conn.getCommandResponse(listCommand)
if err != nil {
return nil, err
}

params, err := jsonToArray(response, listCommand)
if err != nil {
return nil, err
}

result := make([]string, 0, len(commands)*len(params))
for _, command := range commands {
for _, param := range params {
result = append(result, commandWithParams(command, param))
}
}

return result, nil
}

// Executes command using provided connection and returns response
// If error (such as timeout) occurred, then connection is discarded and recreated
// because otherwise behaviour of connection is undefined (e.g. it could return result of timed out command instead of latest)
// because otherwise behavior of connection is undefined (e.g. it could return result of timed out command instead of latest)
func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error) {
connection, err := conn.getConnection()
if err != nil {
Expand All @@ -77,7 +121,7 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error
return nil, fmt.Errorf("failed to send %q command: %w", fullCommand, err)
}

buf := make([]byte, conn.maxOutputLen)
buf := make([]byte, conn.initMessage.MaxOutputLen)
messageLength, err := connection.Read(buf)
if err != nil {
if closeErr := conn.tryClose(); closeErr != nil {
Expand All @@ -92,6 +136,50 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error
return buf[:messageLength], nil
}

// Executes command, parses response and creates/writes metrics from response to accumulator
func (conn *dpdkConnector) processCommand(acc telegraf.Accumulator, log telegraf.Logger, commandWithParams string, metadataFields []string) {
buf, err := conn.getCommandResponse(commandWithParams)
if err != nil {
acc.AddError(err)
return
}

var parsedResponse map[string]interface{}
err = json.Unmarshal(buf, &parsedResponse)
if err != nil {
acc.AddError(fmt.Errorf("failed to unmarshal json response from %q command: %w", commandWithParams, err))
return
}

command := stripParams(commandWithParams)
value := parsedResponse[command]
if isEmpty(value) {
log.Warnf("got empty json on %q command", commandWithParams)
return
}

jf := jsonparser.JSONFlattener{}
err = jf.FullFlattenJSON("", value, true, true)
if err != nil {
acc.AddError(fmt.Errorf("failed to flatten response: %w", err))
return
}

err = processCommandResponse(command, jf.Fields)
if err != nil {
log.Warnf("Failed to process a response of the command: %s. Error: %v. Continue to handle data", command, err)
}

// Add metadata fields if required
conn.addMetadataFields(metadataFields, jf.Fields)

// Add common fields
acc.AddFields(pluginName, jf.Fields, map[string]string{
"command": command,
"params": getParams(commandWithParams),
})
}

func (conn *dpdkConnector) tryClose() error {
if conn.connection == nil {
return nil
Expand Down Expand Up @@ -128,7 +216,7 @@ func (conn *dpdkConnector) getConnection() (net.Conn, error) {
}

// Reads InitMessage for connection. Should be read for each connection, otherwise InitMessage is returned as response for first command.
func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) {
func (conn *dpdkConnector) readInitMessage() (*initMessage, error) {
buf := make([]byte, maxInitMessageLength)
err := conn.setTimeout()
if err != nil {
Expand All @@ -140,21 +228,15 @@ func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) {
return nil, fmt.Errorf("failed to read InitMessage: %w", err)
}

var initMessage initMessage
err = json.Unmarshal(buf[:messageLength], &initMessage)
var connectionInitMessage initMessage
err = json.Unmarshal(buf[:messageLength], &connectionInitMessage)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

if initMessage.MaxOutputLen == 0 {
if connectionInitMessage.MaxOutputLen == 0 {
return nil, fmt.Errorf("failed to read maxOutputLen information")
}

if !conn.messageShowed {
conn.maxOutputLen = initMessage.MaxOutputLen
conn.messageShowed = true
return &initMessage, nil
}

return nil, nil
return &connectionInitMessage, nil
}
Loading
Loading