Skip to content

Commit

Permalink
fix(inputs.phpfpm): Continue despite erroneous sockets (#14852)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Feb 21, 2024
1 parent 7aed6d2 commit c475771
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 44 deletions.
33 changes: 14 additions & 19 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,28 @@ type JSONMetrics struct {
} `json:"processes"`
}

type metric map[string]int64
type poolStat map[string]metric
type metricStat map[string]int64
type poolStat map[string]metricStat

type phpfpm struct {
Format string `toml:"format"`
Timeout config.Duration `toml:"timeout"`
Urls []string `toml:"urls"`

Log telegraf.Logger `toml:"-"`
tls.ClientConfig

client *http.Client
Log telegraf.Logger
}

func (*phpfpm) SampleConfig() string {
return sampleConfig
}

func (p *phpfpm) Init() error {
if len(p.Urls) == 0 {
p.Urls = []string{"http://127.0.0.1/status"}
}

tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return err
Expand All @@ -117,18 +121,8 @@ func (p *phpfpm) Init() error {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
if len(p.Urls) == 0 {
return p.gatherServer("http://127.0.0.1/status", acc)
}

var wg sync.WaitGroup

urls, err := expandUrls(p.Urls)
if err != nil {
return err
}

for _, serv := range urls {
for _, serv := range expandUrls(acc, p.Urls) {
wg.Add(1)
go func(serv string) {
defer wg.Done()
Expand Down Expand Up @@ -259,7 +253,7 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
// We start to gather data for a new pool here
if fieldName == PfPool {
currentPool = strings.Trim(keyvalue[1], " ")
stats[currentPool] = make(metric)
stats[currentPool] = make(metricStat)
continue
}

Expand Down Expand Up @@ -347,7 +341,7 @@ func (p *phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) {
}
}

func expandUrls(urls []string) ([]string, error) {
func expandUrls(acc telegraf.Accumulator, urls []string) []string {
addrs := make([]string, 0, len(urls))
for _, address := range urls {
if isNetworkURL(address) {
Expand All @@ -356,11 +350,12 @@ func expandUrls(urls []string) ([]string, error) {
}
paths, err := globUnixSocket(address)
if err != nil {
return nil, err
acc.AddError(err)
continue
}
addrs = append(addrs, paths...)
}
return addrs, nil
return addrs
}

func globUnixSocket(address string) ([]string, error) {
Expand Down
89 changes: 64 additions & 25 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/shim"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -49,6 +52,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
url := ts.URL + "?test=ok"
r := &phpfpm{
Urls: []string{url},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())
Expand Down Expand Up @@ -96,7 +100,7 @@ func TestPhpFpmGeneratesJSONMetrics_From_Http(t *testing.T) {
input := &phpfpm{
Urls: []string{server.URL + "?full&json"},
Format: "json",
Log: testutil.Logger{},
Log: &testutil.Logger{},
}
require.NoError(t, input.Init())

Expand All @@ -117,8 +121,8 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator
Expand Down Expand Up @@ -161,12 +165,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {

r := &phpfpm{
Urls: []string{tcp.Addr().String()},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

require.NoError(t, acc.GatherError(r.Gather))

tags := map[string]string{
Expand Down Expand Up @@ -214,14 +217,12 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {

r := &phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc1, acc2 testutil.Accumulator

require.NoError(t, acc1.GatherError(r.Gather))

require.NoError(t, acc2.GatherError(r.Gather))

tags1 := map[string]string{
Expand Down Expand Up @@ -267,12 +268,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {

r := &phpfpm{
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

require.NoError(t, acc.GatherError(r.Gather))

tags := map[string]string{
Expand Down Expand Up @@ -300,15 +300,14 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// When not passing server config, we default to localhost
// We just want to make sure we did request stat from localhost
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
r := &phpfpm{Urls: []string{"http://bad.localhost:62001/status"}}

r := &phpfpm{
Urls: []string{"http://bad.localhost:62001/status"},
Log: &testutil.Logger{},
}
require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Contains(t, err.Error(), "/status")
require.ErrorContains(t, acc.GatherError(r.Gather), "/status")
}

func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
Expand All @@ -318,30 +317,25 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t

r := &phpfpm{
Urls: []string{"http://aninvalidone"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Contains(t, err.Error(), `unable to connect to phpfpm status page 'http://aninvalidone'`)
require.Contains(t, err.Error(), `lookup aninvalidone`)
require.ErrorContains(t, err, `unable to connect to phpfpm status page 'http://aninvalidone'`)
require.ErrorContains(t, err, `lookup aninvalidone`)
}

func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
r := &phpfpm{
Urls: []string{"/tmp/invalid.sock"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Equal(t, `socket doesn't exist "/tmp/invalid.sock"`, err.Error())
require.ErrorContains(t, acc.GatherError(r.Gather), `socket doesn't exist "/tmp/invalid.sock"`)
}

const outputSample = `
Expand Down Expand Up @@ -389,3 +383,48 @@ func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *te
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
require.Contains(t, logOutput.String(), "E! Unable to decode JSON response: invalid character 'X' looking for beginning of value")
}

func TestGatherDespiteUnavailable(t *testing.T) {
// Let OS find an available port
tcp, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Cannot initialize test server")
defer tcp.Close()

s := statServer{}
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway

//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status", "/lala"},
Log: &testutil.Logger{},
}
require.NoError(t, r.Init())

expected := []telegraf.Metric{
metric.New(
"phpfpm",
map[string]string{
"pool": "www",
"url": r.Urls[0],
},
map[string]interface{}{
"start_since": int64(1991),
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
},
time.Unix(0, 0),
),
}

var acc testutil.Accumulator
require.ErrorContains(t, acc.GatherError(r.Gather), "socket doesn't exist")
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}

0 comments on commit c475771

Please sign in to comment.