Skip to content

Commit

Permalink
fix(inputs.phpfpm): Avoid blocking forever on FCGI protocol
Browse files Browse the repository at this point in the history
Use the timeout of php-fpm with fgci protocol

Signed-off-by: Pierre Fersing <[email protected]>
  • Loading branch information
PierreF committed Mar 21, 2024
1 parent 63c00fd commit 449e44c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
16 changes: 14 additions & 2 deletions plugins/inputs/phpfpm/fcgi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"net"
"strconv"
"strings"
"time"
)

// Create an fcgi client
func newFcgiClient(h string, args ...interface{}) (*conn, error) {
func newFcgiClient(timeout time.Duration, h string, args ...interface{}) (*conn, error) {
var con net.Conn
if len(args) != 1 {
return nil, errors.New("fcgi: not enough params")
Expand All @@ -19,13 +20,24 @@ func newFcgiClient(h string, args ...interface{}) (*conn, error) {
switch args[0].(type) {
case int:
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
con, err = net.Dial("tcp", addr)
if timeout == 0 {
con, err = net.Dial("tcp", addr)
} else {
con, err = net.DialTimeout("tcp", addr, timeout)
}
case string:
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
con, err = net.DialUnix(h, nil, &laddr)
default:
err = errors.New("fcgi: we only accept int (port) or string (socket) params")
}

if timeout != 0 {
if err := con.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
}

fcgi := &conn{
rwc: con,
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
}
fcgiIP := socketAddr[0]
fcgiPort, _ := strconv.Atoi(socketAddr[1])
fcgi, err = newFcgiClient(fcgiIP, fcgiPort)
fcgi, err = newFcgiClient(time.Duration(p.Timeout), fcgiIP, fcgiPort)
if err != nil {
return err
}
Expand All @@ -173,7 +173,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
if statusPath == "" {
statusPath = "status"
}
fcgi, err = newFcgiClient("unix", socketPath)
fcgi, err = newFcgiClient(time.Duration(p.Timeout), "unix", socketPath)
}

if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/shim"
"github.com/influxdata/telegraf/plugins/parsers/influx"
Expand Down Expand Up @@ -150,6 +151,47 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}

func TestPhpFpmTimeout_From_Fcgi(t *testing.T) {
// Let OS find an available port
tcp, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Cannot initialize test server")
defer tcp.Close()

go func() {
// Just accept the connection and do nothing with it
var clientSockets []net.Conn

for {
rw, err := tcp.Accept()
if err != nil {
return // ignore the returned error as we cannot do anything about it anyway
}

// Kept the socket reachable from GC to make sure GC don't close the connection.
clientSockets = append(clientSockets, rw)
_ = clientSockets // Kept linter happy about the fact we don't use clientSockets
}
}()

const timeout = time.Second

//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
Timeout: config.Duration(timeout),
Log: &testutil.Logger{},
}
require.NoError(t, r.Init())

start := time.Now()

var acc testutil.Accumulator
require.NoError(t, acc.GatherError(r.Gather))

acc.AssertDoesNotContainMeasurement(t, "phpfpm")
require.GreaterOrEqual(t, time.Since(start), timeout)
}

func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
// Create a socket in /tmp because we always have write permission and if the
// removing of socket fail when system restart /tmp is clear so
Expand Down

0 comments on commit 449e44c

Please sign in to comment.