diff --git a/plugins/inputs/phpfpm/fcgi_client.go b/plugins/inputs/phpfpm/fcgi_client.go index 87b757dcf1e90..38f210b118f4a 100644 --- a/plugins/inputs/phpfpm/fcgi_client.go +++ b/plugins/inputs/phpfpm/fcgi_client.go @@ -6,10 +6,11 @@ import ( "net" "strconv" "strings" + "time" ) // Create an fcgi client -func newFcgiClient(h string, args ...interface{}) (*conn, error) { +func newFcgiClient(timeout time.Duration, h string, args ...interface{}) (*conn, error) { var con net.Conn if len(args) != 1 { return nil, errors.New("fcgi: not enough params") @@ -19,13 +20,24 @@ func newFcgiClient(h string, args ...interface{}) (*conn, error) { switch args[0].(type) { case int: addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) - con, err = net.Dial("tcp", addr) + if timeout == 0 { + con, err = net.Dial("tcp", addr) + } else { + con, err = net.DialTimeout("tcp", addr, timeout) + } case string: laddr := net.UnixAddr{Name: args[0].(string), Net: h} con, err = net.DialUnix(h, nil, &laddr) default: err = errors.New("fcgi: we only accept int (port) or string (socket) params") } + + if timeout != 0 { + if err := con.SetDeadline(time.Now().Add(timeout)); err != nil { + return nil, err + } + } + fcgi := &conn{ rwc: con, } @@ -66,6 +78,12 @@ READ_LOOP: } break } + if err1 != nil && strings.Contains(err1.Error(), "i/o timeout") { + if !errors.Is(err1, io.EOF) { + err = err1 + } + break + } switch { case rec.h.Type == typeStdout: diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index f96b6184c68bd..75b6420379f42 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -159,7 +159,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { } fcgiIP := socketAddr[0] fcgiPort, _ := strconv.Atoi(socketAddr[1]) - fcgi, err = newFcgiClient(fcgiIP, fcgiPort) + fcgi, err = newFcgiClient(time.Duration(p.Timeout), fcgiIP, fcgiPort) if err != nil { return err } @@ -173,7 +173,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if statusPath == "" { statusPath = "status" } - fcgi, err = newFcgiClient("unix", socketPath) + fcgi, err = newFcgiClient(time.Duration(p.Timeout), "unix", socketPath) } if err != nil { diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index 38edc92289814..65510b302d6c0 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/shim" "github.com/influxdata/telegraf/plugins/parsers/influx" @@ -150,6 +151,42 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) { acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags) } +func TestPhpFpmTimeout_From_Fcgi(t *testing.T) { + // Let OS find an available port + tcp, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Cannot initialize test server") + defer tcp.Close() + + const timeout = 200 * time.Millisecond + + go func() { + conn, err := tcp.Accept() + if err != nil { + return // ignore the returned error as we cannot do anything about it anyway + } + defer conn.Close() + + // Sleep longer than the timeout + time.Sleep(2 * timeout) + }() + + //Now we tested again above server + r := &phpfpm{ + Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"}, + Timeout: config.Duration(timeout), + Log: &testutil.Logger{}, + } + require.NoError(t, r.Init()) + + start := time.Now() + + var acc testutil.Accumulator + require.Error(t, acc.GatherError(r.Gather)) + + require.Empty(t, acc.GetTelegrafMetrics()) + require.GreaterOrEqual(t, time.Since(start), timeout) +} + func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) { // Create a socket in /tmp because we always have write permission and if the // removing of socket fail when system restart /tmp is clear so