From 64d412e1112458e121ea381e628e4b0a37113705 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 26 Jul 2017 09:10:12 -0700 Subject: [PATCH] physical: add default timeout for etcd3 requests (#3053) --- physical/etcd3.go | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/physical/etcd3.go b/physical/etcd3.go index 7f86e4eed17e..c76de2e44035 100644 --- a/physical/etcd3.go +++ b/physical/etcd3.go @@ -11,7 +11,6 @@ import ( "time" metrics "github.com/armon/go-metrics" - "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/pkg/transport" @@ -34,7 +33,10 @@ type EtcdBackend struct { } // etcd default lease duration is 60s. set to 15s for faster recovery. -const etcd3LockTimeoutInSeconds = 15 +const ( + etcd3LockTimeoutInSeconds = 15 + etcd3RequestTimeoutInSeconds = 5 +) // newEtcd3Backend constructs a etcd3 backend. func newEtcd3Backend(conf map[string]string, logger log.Logger) (Backend, error) { @@ -118,7 +120,7 @@ func newEtcd3Backend(conf map[string]string, logger log.Logger) (Backend, error) } if sync { - ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) err := etcd.Sync(ctx) cancel() if err != nil { @@ -141,7 +143,9 @@ func (c *EtcdBackend) Put(entry *Entry) error { c.permitPool.Acquire() defer c.permitPool.Release() - _, err := c.etcd.Put(context.Background(), path.Join(c.path, entry.Key), string(entry.Value)) + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + _, err := c.etcd.Put(ctx, path.Join(c.path, entry.Key), string(entry.Value)) return err } @@ -151,7 +155,9 @@ func (c *EtcdBackend) Get(key string) (*Entry, error) { c.permitPool.Acquire() defer c.permitPool.Release() - resp, err := c.etcd.Get(context.Background(), path.Join(c.path, key)) + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + resp, err := c.etcd.Get(ctx, path.Join(c.path, key)) if err != nil { return nil, err } @@ -174,7 +180,9 @@ func (c *EtcdBackend) Delete(key string) error { c.permitPool.Acquire() defer c.permitPool.Release() - _, err := c.etcd.Delete(context.Background(), path.Join(c.path, key)) + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + _, err := c.etcd.Delete(ctx, path.Join(c.path, key)) if err != nil { return err } @@ -187,8 +195,10 @@ func (c *EtcdBackend) List(prefix string) ([]string, error) { c.permitPool.Acquire() defer c.permitPool.Release() + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() prefix = path.Join(c.path, prefix) - resp, err := c.etcd.Get(context.Background(), prefix, clientv3.WithPrefix()) + resp, err := c.etcd.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } @@ -265,7 +275,10 @@ func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { } return nil, err } - if _, err := c.etcd.Put(ctx, c.etcdMu.Key(), c.value, clientv3.WithLease(c.etcdSession.Lease())); err != nil { + + pctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + if _, err := c.etcd.Put(pctx, c.etcdMu.Key(), c.value, clientv3.WithLease(c.etcdSession.Lease())); err != nil { return nil, err } @@ -282,11 +295,16 @@ func (c *EtcdLock) Unlock() error { return EtcdLockNotHeldError } - return c.etcdMu.Unlock(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + return c.etcdMu.Unlock(ctx) } func (c *EtcdLock) Value() (bool, string, error) { - resp, err := c.etcd.Get(context.Background(), + ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds) + defer cancel() + + resp, err := c.etcd.Get(ctx, c.prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))