Skip to content

Commit

Permalink
Fix missing timeouts in vsphere input (influxdata#4840)
Browse files Browse the repository at this point in the history
  • Loading branch information
prydin authored and rgitzel committed Oct 17, 2018
1 parent 5e698b4 commit 9c18a89
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 31 deletions.
36 changes: 23 additions & 13 deletions plugins/inputs/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Client struct {
Root *view.ContainerView
Perf *performance.Manager
Valid bool
Timeout time.Duration
closeGate sync.Once
}

Expand All @@ -53,17 +54,21 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
defer cf.mux.Unlock()
if cf.client == nil {
var err error
if cf.client, err = NewClient(cf.url, cf.parent); err != nil {
if cf.client, err = NewClient(ctx, cf.url, cf.parent); err != nil {
return nil, err
}
}

// Execute a dummy call against the server to make sure the client is
// still functional. If not, try to log back in. If that doesn't work,
// we give up.
if _, err := methods.GetCurrentTime(ctx, cf.client.Client); err != nil {
ctx1, cancel1 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel1()
if _, err := methods.GetCurrentTime(ctx1, cf.client.Client); err != nil {
log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!")
if cf.client.Client.SessionManager.Login(ctx, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel2()
if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
return nil, err
}
}
Expand All @@ -72,7 +77,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
}

// NewClient creates a new vSphere client based on the url and setting passed as parameters.
func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
sw := NewStopwatch("connect", u.Host)
tlsCfg, err := vs.ClientConfig.TLSConfig()
if err != nil {
Expand All @@ -85,7 +90,6 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
if vs.Username != "" {
u.User = url.UserPassword(vs.Username, vs.Password)
}
ctx := context.Background()

log.Printf("D! [input.vsphere]: Creating client: %s", u.Host)
soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify)
Expand All @@ -103,15 +107,19 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
}
}

vimClient, err := vim25.NewClient(ctx, soapClient)
ctx1, cancel1 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel1()
vimClient, err := vim25.NewClient(ctx1, soapClient)
if err != nil {
return nil, err
}
sm := session.NewManager(vimClient)

// If TSLKey is specified, try to log in as an extension using a cert.
if vs.TLSKey != "" {
if err := sm.LoginExtensionByCertificate(ctx, vs.TLSKey); err != nil {
ctx2, cancel2 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel2()
if err := sm.LoginExtensionByCertificate(ctx2, vs.TLSKey); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -142,11 +150,12 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
sw.Stop()

return &Client{
Client: c,
Views: m,
Root: v,
Perf: p,
Valid: true,
Client: c,
Views: m,
Root: v,
Perf: p,
Valid: true,
Timeout: vs.Timeout.Duration,
}, nil
}

Expand All @@ -164,7 +173,8 @@ func (c *Client) close() {
// Use a Once to prevent us from panics stemming from trying
// to close it multiple times.
c.closeGate.Do(func() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
if c.Client != nil {
if err := c.Client.Logout(ctx); err != nil {
log.Printf("E! [input.vsphere]: Error during logout: %s", err)
Expand Down
58 changes: 40 additions & 18 deletions plugins/inputs/vsphere/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type resourceKind struct {
objects objectMap
filters filter.Filter
collectInstances bool
getObjects func(context.Context, *view.ContainerView) (objectMap, error)
getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error)
}

type metricEntry struct {
Expand Down Expand Up @@ -253,7 +253,9 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro
return nil, err
}

mn, err := client.Perf.CounterInfoByName(ctx)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
mn, err := client.Perf.CounterInfoByName(ctx1)

if err != nil {
return nil, err
Expand All @@ -272,7 +274,9 @@ func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{}
}

rq := in.(*metricQRequest)
metrics, err := client.Perf.AvailableMetric(ctx, rq.obj.ref.Reference(), rq.res.sampling)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling)
if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
}
Expand All @@ -292,7 +296,9 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache
path = append(path, here.Reference().String())
o := object.NewCommon(client.Client.Client, r)
var result mo.ManagedEntity
err := o.Properties(ctx, here, []string{"parent", "name"}, &result)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := o.Properties(ctx1, here, []string{"parent", "name"}, &result)
if err != nil {
log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err)
break
Expand Down Expand Up @@ -344,7 +350,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled (but datastore)
if res.enabled || (k != "datastore" && k != "vm") {
objects, err := res.getObjects(ctx, client.Root)
objects, err := res.getObjects(ctx, e, client.Root)
if err != nil {
return err
}
Expand Down Expand Up @@ -411,9 +417,11 @@ func (e *Endpoint) discover(ctx context.Context) error {
return nil
}

func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datacenter
err := root.Retrieve(ctx, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -425,9 +433,11 @@ func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, e
return m, nil
}

func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.ClusterComputeResource
err := root.Retrieve(ctx, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -439,7 +449,9 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
if !ok {
o := object.NewFolder(root.Client(), *r.Parent)
var folder mo.Folder
err := o.Properties(ctx, *r.Parent, []string{"parent"}, &folder)
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
err := o.Properties(ctx2, *r.Parent, []string{"parent"}, &folder)
if err != nil {
log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err)
p = nil
Expand All @@ -455,7 +467,7 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
return m, nil
}

func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.HostSystem
err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
if err != nil {
Expand All @@ -469,9 +481,11 @@ func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error)
return m, nil
}

func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.VirtualMachine
err := root.Retrieve(ctx, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -491,9 +505,11 @@ func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) {
return m, nil
}

func getDatastores(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datastore
err := root.Retrieve(ctx, []string{"Datastore"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -696,17 +712,23 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
return 0, err
}

metricInfo, err := client.Perf.CounterInfoByName(ctx)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metricInfo, err := client.Perf.CounterInfoByName(ctx1)
if err != nil {
return count, err
}

metrics, err := client.Perf.Query(ctx, pqs)
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
metrics, err := client.Perf.Query(ctx2, pqs)
if err != nil {
return count, err
}

ems, err := client.Perf.ToMetricSeries(ctx, metrics)
ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel3()
ems, err := client.Perf.ToMetricSeries(ctx3, metrics)
if err != nil {
return count, err
}
Expand Down
22 changes: 22 additions & 0 deletions plugins/inputs/vsphere/vsphere_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"regexp"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -229,6 +230,27 @@ func TestWorkerPool(t *testing.T) {
}
}

func TestTimeout(t *testing.T) {
m, s, err := createSim()
if err != nil {
t.Fatal(err)
}
defer m.Remove()
defer s.Close()

var acc testutil.Accumulator
v := defaultVSphere()
v.Vcenters = []string{s.URL.String()}
v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond}
require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil.
defer v.Stop()
require.NoError(t, v.Gather(&acc))

// The accumulator must contain exactly one error and it must be a deadline exceeded.
require.Equal(t, 1, len(acc.Errors))
require.True(t, strings.Contains(acc.Errors[0].Error(), "context deadline exceeded"))
}

func TestAll(t *testing.T) {
m, s, err := createSim()
if err != nil {
Expand Down

0 comments on commit 9c18a89

Please sign in to comment.