Skip to content

Commit

Permalink
Add some tests, split things into util file
Browse files Browse the repository at this point in the history
  • Loading branch information
cdalke-havoc committed Nov 23, 2024
1 parent 1151fda commit a85be7a
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 136 deletions.
155 changes: 20 additions & 135 deletions plugins/inputs/mavlink/mavlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@ package mavlink

import (
_ "embed"
"fmt"
"log"
"reflect"
"regexp"
"strconv"
"strings"
"time"

"github.com/bluenviron/gomavlib/v3"
Expand All @@ -17,27 +12,6 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)

// Convert from CamelCase to snake_case
func ConvertToSnakeCase(input string) string {
re := regexp.MustCompile(`([a-z0-9])([A-Z])`)
snake := re.ReplaceAllString(input, `${1}_${2}`)
snake = strings.ToLower(snake)
return snake
}

// Function to check if a string is in a slice
func Contains(slice []string, str string) bool {
for _, item := range slice {
if item == str {
return true
}
}
return false
}

//go:embed sample.conf
var sampleConfig string

// Plugin state
type Mavlink struct {
// Config param
Expand All @@ -54,93 +28,29 @@ type Mavlink struct {
terminated bool
}

//go:embed sample.conf
var sampleConfig string

func (*Mavlink) SampleConfig() string {
return sampleConfig
}

// Container for a parsed Mavlink frame
type MetricFrameData struct {
name string
tags map[string]string
fields map[string]any
}

func (s *Mavlink) Start(acc telegraf.Accumulator) error {
s.acc = acc

// Start goroutine to connect to Mavlink and stream out data async
// Start routine to connect to Mavlink and stream out data async
go func() {
endpointConfig := []gomavlib.EndpointConf{}
if strings.HasPrefix(s.FcuUrl, "serial://") {
tmpStr := strings.TrimPrefix(s.FcuUrl, "serial://")
tmpStrParts := strings.Split(tmpStr, ":")
deviceName := tmpStrParts[0]
baudRate := 57600
if len(tmpStrParts) == 2 {
newBaudRate, err := strconv.Atoi(tmpStrParts[1])
if err != nil {
log.Printf("Mavlink setup error: serial baud rate not valid!")
return
}
baudRate = newBaudRate
}

log.Printf("Mavlink serial client: device %s, baud rate %d", deviceName, baudRate)
endpointConfig = []gomavlib.EndpointConf{
gomavlib.EndpointSerial{
Device: deviceName,
Baud: baudRate,
},
}
} else if strings.HasPrefix(s.FcuUrl, "tcp://") {
// TCP client
tmpStr := strings.TrimPrefix(s.FcuUrl, "tcp://")
tmpStrParts := strings.Split(tmpStr, ":")
if len(tmpStrParts) != 2 {
log.Printf("Mavlink setup error: TCP requires a port!")
return
}

hostname := tmpStrParts[0]
port := 14550
port, err := strconv.Atoi(tmpStrParts[1])
if err != nil {
log.Printf("Mavlink setup error: TCP port is invalid!")
return
}

if len(hostname) > 0 {
log.Printf("Mavlink TCP client: hostname %s, port %d", hostname, port)
endpointConfig = []gomavlib.EndpointConf{
gomavlib.EndpointTCPClient{fmt.Sprintf("%s:%d", hostname, port)},
}
} else {
log.Printf("Mavlink TCP server: port %d", port)
endpointConfig = []gomavlib.EndpointConf{
gomavlib.EndpointTCPServer{fmt.Sprintf(":%d", port)},
}
}
} else if strings.HasPrefix(s.FcuUrl, "udp://") {
// UDP client or server
tmpStr := strings.TrimPrefix(s.FcuUrl, "udp://")
tmpStrParts := strings.Split(tmpStr, ":")
if len(tmpStrParts) != 2 {
log.Printf("Mavlink setup error: UDP requires a port!")
return
}

hostname := tmpStrParts[0]
port := 14550
port, err := strconv.Atoi(tmpStrParts[1])
if err != nil {
log.Printf("Mavlink setup error: UDP port is invalid!")
return
}

if len(hostname) > 0 {
log.Printf("Mavlink UDP client: hostname %s, port %d", hostname, port)
endpointConfig = []gomavlib.EndpointConf{
gomavlib.EndpointUDPClient{fmt.Sprintf("%s:%d", hostname, port)},
}
} else {
log.Printf("Mavlink UDP server: port %d", port)
endpointConfig = []gomavlib.EndpointConf{
gomavlib.EndpointUDPServer{fmt.Sprintf(":%d", port)},
}
}
endpointConfig, err := ParseMavlinkEndpointConfig(s)
if err != nil {
log.Printf("%s", err.Error())
return
}

// Start MAVLink endpoint
Expand All @@ -164,7 +74,6 @@ func (s *Mavlink) Start(acc telegraf.Accumulator) error {
s.connection = connection
}
defer s.connection.Close()

if s.terminated {
return
}
Expand All @@ -175,35 +84,12 @@ func (s *Mavlink) Start(acc telegraf.Accumulator) error {
switch evt.(type) {
case *gomavlib.EventFrame:
if frm, ok := evt.(*gomavlib.EventFrame); ok {
tags := map[string]string{}
var fields = make(map[string]interface{})
tags["sys_id"] = fmt.Sprintf("%d", frm.SystemID())
tags["fcu_url"] = s.FcuUrl

m := frm.Message()
t := reflect.TypeOf(m)
v := reflect.ValueOf(m)
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
value := v.Field(i)
fields[ConvertToSnakeCase(field.Name)] = value.Interface()
}

msg_name := ConvertToSnakeCase(t.Name())
if strings.HasPrefix(msg_name, "message_") {
msg_name = strings.TrimPrefix(msg_name, "message_")

if len(s.MessageFilter) > 0 && Contains(s.MessageFilter, msg_name) {
log.Printf("%s did not match filter\n", msg_name)
continue
}
s.acc.AddFields(msg_name, fields, tags)
result := MavlinkEventFrameToMetric(frm)
if len(s.MessageFilter) > 0 && Contains(s.MessageFilter, result.name) {
continue
}
result.tags["fcu_url"] = s.FcuUrl
s.acc.AddFields(result.name, result.fields, result.tags)
}

case *gomavlib.EventChannelOpen:
Expand All @@ -219,7 +105,6 @@ func (s *Mavlink) Start(acc telegraf.Accumulator) error {
}

func (s *Mavlink) Gather(_ telegraf.Accumulator) error {
// Nothing to do when gathering metrics; fields are accumulated async.
return nil
}

Expand Down
64 changes: 63 additions & 1 deletion plugins/inputs/mavlink/mavlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,67 @@ import (
"testing"
)

func TestMavlinkClientInit(t *testing.T) {
// Test that a serial port URL can be parsed.
func TestParseSerialFcuUrl(t *testing.T) {
testConfig = Mavlink{
FcuUrl: "serial://dev/ttyACM0:115200",
}

conf, error := ParseMavlinkEndpointConfig(&testConfig)
require.Equal(t, error, nil)
}

// Test that a UDP client URL can be parsed.
func TestParseUDPClientFcuUrl(t *testing.T) {
testConfig = Mavlink{
FcuUrl: "udp://192.168.1.12:14550",
}

conf, error := ParseMavlinkEndpointConfig(&testConfig)
require.Equal(t, error, nil)
}

// Test that a UDP server URL can be parsed.
func TestParseUDPServerFcuUrl(t *testing.T) {
testConfig = Mavlink{
FcuUrl: "udp://:14540",
}

conf, error := ParseMavlinkEndpointConfig(&testConfig)
require.Equal(t, error, nil)
}

// Test that a TCP client URL can be parsed.
func TestParseTCPClientFcuUrl(t *testing.T) {
testConfig = Mavlink{
FcuUrl: "tcp://192.168.1.12:14550",
}

conf, error := ParseMavlinkEndpointConfig(&testConfig)
require.Equal(t, error, nil)
}

// Test that an invalid URL is caught.
func TestParseInvalidFcuUrl(t *testing.T) {
testConfig = Mavlink{
FcuUrl: "ftp://not-a-valid-fcu-url",
}

conf, error := ParseMavlinkEndpointConfig(&testConfig)
require.Equal(t, error, "")
}

func TestStringContains(t *testing.T) {
testArr := []string{"test1", "test2", "test3"}
require.Equal(t, true, Contains(testArr, "test1"))
require.Equal(t, true, Contains(testArr, "test2"))
require.Equal(t, true, Contains(testArr, "test3"))
require.Equal(t, false, Contains(testArr, "test4"))
}

func TestConvertToSnakeCase(t *testing.T) {
require.Equal(t, "", ConvertToSnakeCase(""))
require.Equal(t, "CamelCase", ConvertToSnakeCase("camel_case"))
require.Equal(t, "CamelCamelCase", ConvertToSnakeCase("camel_camel_case"))
require.Equal(t, "snake_case", ConvertToSnakeCase("snake_case"))
}
Loading

0 comments on commit a85be7a

Please sign in to comment.