Skip to content

Commit

Permalink
physical: add default timeout for etcd3 requests (#3053)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 authored and jefferai committed Jul 26, 2017
1 parent 6e311aa commit 64d412e
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions physical/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/pkg/transport"
Expand All @@ -34,7 +33,10 @@ type EtcdBackend struct {
}

// etcd default lease duration is 60s. set to 15s for faster recovery.
const etcd3LockTimeoutInSeconds = 15
const (
etcd3LockTimeoutInSeconds = 15
etcd3RequestTimeoutInSeconds = 5
)

// newEtcd3Backend constructs a etcd3 backend.
func newEtcd3Backend(conf map[string]string, logger log.Logger) (Backend, error) {
Expand Down Expand Up @@ -118,7 +120,7 @@ func newEtcd3Backend(conf map[string]string, logger log.Logger) (Backend, error)
}

if sync {
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
err := etcd.Sync(ctx)
cancel()
if err != nil {
Expand All @@ -141,7 +143,9 @@ func (c *EtcdBackend) Put(entry *Entry) error {
c.permitPool.Acquire()
defer c.permitPool.Release()

_, err := c.etcd.Put(context.Background(), path.Join(c.path, entry.Key), string(entry.Value))
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
_, err := c.etcd.Put(ctx, path.Join(c.path, entry.Key), string(entry.Value))
return err
}

Expand All @@ -151,7 +155,9 @@ func (c *EtcdBackend) Get(key string) (*Entry, error) {
c.permitPool.Acquire()
defer c.permitPool.Release()

resp, err := c.etcd.Get(context.Background(), path.Join(c.path, key))
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
resp, err := c.etcd.Get(ctx, path.Join(c.path, key))
if err != nil {
return nil, err
}
Expand All @@ -174,7 +180,9 @@ func (c *EtcdBackend) Delete(key string) error {
c.permitPool.Acquire()
defer c.permitPool.Release()

_, err := c.etcd.Delete(context.Background(), path.Join(c.path, key))
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
_, err := c.etcd.Delete(ctx, path.Join(c.path, key))
if err != nil {
return err
}
Expand All @@ -187,8 +195,10 @@ func (c *EtcdBackend) List(prefix string) ([]string, error) {
c.permitPool.Acquire()
defer c.permitPool.Release()

ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
prefix = path.Join(c.path, prefix)
resp, err := c.etcd.Get(context.Background(), prefix, clientv3.WithPrefix())
resp, err := c.etcd.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -265,7 +275,10 @@ func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
}
return nil, err
}
if _, err := c.etcd.Put(ctx, c.etcdMu.Key(), c.value, clientv3.WithLease(c.etcdSession.Lease())); err != nil {

pctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
if _, err := c.etcd.Put(pctx, c.etcdMu.Key(), c.value, clientv3.WithLease(c.etcdSession.Lease())); err != nil {
return nil, err
}

Expand All @@ -282,11 +295,16 @@ func (c *EtcdLock) Unlock() error {
return EtcdLockNotHeldError
}

return c.etcdMu.Unlock(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()
return c.etcdMu.Unlock(ctx)
}

func (c *EtcdLock) Value() (bool, string, error) {
resp, err := c.etcd.Get(context.Background(),
ctx, cancel := context.WithTimeout(context.Background(), etcd3RequestTimeoutInSeconds)
defer cancel()

resp, err := c.etcd.Get(ctx,
c.prefix, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))

Expand Down

0 comments on commit 64d412e

Please sign in to comment.