From e94c36471fceb1deefc6c7980da58a1f742f4e0d Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Sat, 19 May 2018 15:49:34 +0800 Subject: [PATCH 01/18] First pass at etcd3 locker --- .scripts/test_all.sh | 6 +- etcd3locker/lock.go | 47 +++++++++++++++ etcd3locker/locker.go | 120 +++++++++++++++++++++++++++++++++++++ etcd3locker/locker_test.go | 43 +++++++++++++ 4 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 etcd3locker/lock.go create mode 100644 etcd3locker/locker.go create mode 100644 etcd3locker/locker_test.go diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index d61b0ec30..baacf718c 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -5,7 +5,7 @@ set -e # Find all packages containing Go source code inside the current directory packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq) -# The consul package only supports Go1.8+ and therefore we will only run the +# The consul package only supports Go1.10+ and therefore we will only run the # corresponding tests on these versions. goversion=$(go version) if [[ "$goversion" == *"go1.5"* ]] || @@ -24,6 +24,10 @@ if [[ "$goversion" == *"go1.5"* ]] || else # Install the Consul packages which are not vendored. go get -u github.com/hashicorp/consul/... + + # Install the etcd packages which are not vendored. + go get -u github.com/coreos/etcd + go get -u github.com/mwitkow/go-etcd-harness fi # Install the AWS SDK and Prometheus client which is explicitly not vendored diff --git a/etcd3locker/lock.go b/etcd3locker/lock.go new file mode 100644 index 000000000..6cf77427c --- /dev/null +++ b/etcd3locker/lock.go @@ -0,0 +1,47 @@ +// Tested on etcd 3.1+ +package etcd3locker + +import ( + "context" + "time" + + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/tus/tusd" +) + +type Etcd3Lock struct { + Id string + Mutex *concurrency.Mutex + Session *concurrency.Session +} + +func NewEtcd3Lock(session *concurrency.Session, id string) *Etcd3Lock { + return &Etcd3Lock{ + Mutex: concurrency.NewMutex(session, id), + Session: session, + } +} + +func (lock *Etcd3Lock) Acquire() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // this is a blocking call; if we receive DeadlineExceeded + // the lock is most likely already taken + if err := lock.Mutex.Lock(ctx); err != nil { + if err == context.DeadlineExceeded { + return tusd.ErrFileLocked + } else { + return err + } + } + return nil +} + +func (lock *Etcd3Lock) Release() error { + return lock.Mutex.Unlock(context.Background()) +} + +func (lock *Etcd3Lock) CloseSession() error { + return lock.Session.Close() +} diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go new file mode 100644 index 000000000..0660d4a09 --- /dev/null +++ b/etcd3locker/locker.go @@ -0,0 +1,120 @@ +// Tested on etcd 3.1/3.2/3.3 +package etcd3locker + +import ( + "context" + "errors" + "log" + "strings" + "sync" + + etcd3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/tus/tusd" +) + +var ( + ErrLockNotHeld = errors.New("Lock not held") + DefaultTtl = int64(60) +) + +type Etcd3Locker struct { + // etcd3 client session + Client *etcd3.Client + + // locks is used for storing Etcd3Locks before they are + // unlocked. If you want to release a lock, you need the same locker + // instance and therefore we need to save them temporarily. + locks map[string]*Etcd3Lock + mutex sync.Mutex + prefix string +} + +// New constructs a new locker using the provided client. +func New(client *etcd3.Client) (*Etcd3Locker, error) { + return NewWithPrefix(client, "/tusd") +} + +// This method may be used if a different prefix is required for multi-tenant etcd clusters +func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) { + locksMap := map[string]*Etcd3Lock{} + + if !strings.HasPrefix(prefix, "/") { + prefix = "/" + prefix + } + + return &Etcd3Locker{Client: client, prefix: prefix, locks: locksMap, mutex: sync.Mutex{}}, nil +} + +// UseIn adds this locker to the passed composer. +func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) { + composer.UseLocker(locker) +} + +// LockUpload tries to obtain the exclusive lock. +func (locker *Etcd3Locker) LockUpload(id string) error { + session, err := locker.createSession() + if err != nil { + return err + } + + lock := NewEtcd3Lock(session, locker.getId(id)) + + err = lock.Acquire() + if err != nil { + return err + } + + locker.mutex.Lock() + defer locker.mutex.Unlock() + // Only add the lock to our list if the acquire was successful and no error appeared. + locker.locks[locker.getId(id)] = lock + + return nil +} + +// UnlockUpload releases a lock. If no such lock exists, no error will be returned. +func (locker *Etcd3Locker) UnlockUpload(id string) error { + locker.mutex.Lock() + defer locker.mutex.Unlock() + + // Complain if no lock has been found. This can only happen if LockUpload + // has not been invoked before or UnlockUpload multiple times. + lock, ok := locker.locks[locker.getId(id)] + if !ok { + return ErrLockNotHeld + } + + err := lock.Release() + if err != nil { + return err + } + + defer delete(locker.locks, locker.getId(id)) + return lock.CloseSession() +} + +func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { + lease, err := locker.Client.Grant(context.TODO(), DefaultTtl) + if err != nil { + log.Fatal(err) + } + + // Keep lease alive until this process dies + ch, keepAliveErr := locker.Client.KeepAlive(context.TODO(), lease.ID) + if keepAliveErr != nil { + log.Fatal(keepAliveErr) + } + + go func() { + for _ = range ch { + // do nothing + } + }() + + return concurrency.NewSession(locker.Client, concurrency.WithLease(lease.ID)) +} + +func (locker *Etcd3Locker) getId(id string) string { + return locker.prefix + id +} diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go new file mode 100644 index 000000000..483479473 --- /dev/null +++ b/etcd3locker/locker_test.go @@ -0,0 +1,43 @@ +package etcd3locker + +import ( + "github.com/coreos/etcd/clientv3" + etcd_harness "github.com/mwitkow/go-etcd-harness" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/tus/tusd" +) + +func TestEtcd3Locker(t *testing.T) { + a := assert.New(t) + + harness, err := etcd_harness.New(os.Stderr) + if err != nil { + t.Fatalf("failed starting etcd harness: %v", err) + } + t.Logf("will use etcd harness endpoint: %v", harness.Endpoint) + defer func() { + harness.Stop() + t.Logf("cleaned up etcd harness") + }() + + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{harness.Endpoint}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + t.Fatalf("Unable to connect to etcd3: %v", err) + } + defer client.Close() + + locker, err := New(client) + a.NoError(err) + a.NoError(locker.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) + a.NoError(locker.UnlockUpload("one")) + a.Equal(ErrLockNotHeld, locker.UnlockUpload("one")) +} From 1a393fb03fb55558c3beeb3bdde42153182a9aaf Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 29 Aug 2018 10:23:01 -0700 Subject: [PATCH 02/18] Add etcd packages to vendor ignore attribute --- vendor/vendor.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/vendor.json b/vendor/vendor.json index 39b546d52..83d5be03f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1,6 +1,6 @@ { "comment": "", - "ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/", + "ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/ github.com/coreos/etcd github.com/mwitkow/go-etcd-harness", "package": [ { "path": "appengine", From a95c1cd735eb92ed44f96f89589c343cc6a47ed7 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 29 Aug 2018 10:32:15 -0700 Subject: [PATCH 03/18] Properly exclude etcd3locker from earlier versions of golang --- .scripts/test_all.sh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index baacf718c..a535971a8 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -21,13 +21,22 @@ if [[ "$goversion" == *"go1.5"* ]] || echo "Skipping tests requiring GCSStore, which is not supported on $goversion" packages=$(echo "$packages" | sed '/gcsstore/d') + + echo "Skipping tests requiring etcd3locker, which is not supported on $goversion" + packages=$(echo "$packages" | sed '/etcd3locker/d') else # Install the Consul packages which are not vendored. go get -u github.com/hashicorp/consul/... # Install the etcd packages which are not vendored. go get -u github.com/coreos/etcd + # use release 3.3 as master branches are not stable for etcd + (cd ../../coreos/etcd && git fetch origin && git checkout release-3.3) + go get -u google.golang.org/grpc + go get -u github.com/coreos/go-semver + go get -u github.com/ugorji/go/codec go get -u github.com/mwitkow/go-etcd-harness + fi # Install the AWS SDK and Prometheus client which is explicitly not vendored From 3e41283188da2e921ac165815df3f475b6c2fea6 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 29 Aug 2018 17:10:12 -0700 Subject: [PATCH 04/18] Etcd test to only run on go1.9 and go1.10 --- .scripts/test_all.sh | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index a535971a8..042974a2f 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -21,22 +21,32 @@ if [[ "$goversion" == *"go1.5"* ]] || echo "Skipping tests requiring GCSStore, which is not supported on $goversion" packages=$(echo "$packages" | sed '/gcsstore/d') - - echo "Skipping tests requiring etcd3locker, which is not supported on $goversion" - packages=$(echo "$packages" | sed '/etcd3locker/d') else # Install the Consul packages which are not vendored. go get -u github.com/hashicorp/consul/... +fi - # Install the etcd packages which are not vendored. +install_etcd_pkgs() { go get -u github.com/coreos/etcd - # use release 3.3 as master branches are not stable for etcd - (cd ../../coreos/etcd && git fetch origin && git checkout release-3.3) go get -u google.golang.org/grpc go get -u github.com/coreos/go-semver go get -u github.com/ugorji/go/codec go get -u github.com/mwitkow/go-etcd-harness +} +# The etcd 3.3.x package only supports Go1.9+ and therefore +# we will only run the corresponding tests on these versions. +if [[ "$goversion" == *"go1.5"* ]] || + [[ "$goversion" == *"go1.6"* ]] || + [[ "$goversion" == *"go1.7"* ]] || + [[ "$goversion" == *"go1.8"* ]]; then + echo "Skipping tests requiring etcd3locker, which is not supported on $goversion" + packages=$(echo "$packages" | sed '/etcd3locker/d') +else + # Install the etcd packages which are not vendored. + install_etcd_pkgs + # use release 3.3 as master branches are not stable for etcd + (cd ../../coreos/etcd && git fetch origin && git checkout release-3.3) fi # Install the AWS SDK and Prometheus client which is explicitly not vendored From d41c87f5975b8ac664890c1f2b60f7d924b56ce3 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 13 Sep 2018 13:29:37 -0400 Subject: [PATCH 05/18] Do not export internal etcd3locker methods --- etcd3locker/lock.go | 12 ++++++------ etcd3locker/locker.go | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/etcd3locker/lock.go b/etcd3locker/lock.go index 6cf77427c..d4f6b5291 100644 --- a/etcd3locker/lock.go +++ b/etcd3locker/lock.go @@ -9,20 +9,20 @@ import ( "github.com/tus/tusd" ) -type Etcd3Lock struct { +type etcd3Lock struct { Id string Mutex *concurrency.Mutex Session *concurrency.Session } -func NewEtcd3Lock(session *concurrency.Session, id string) *Etcd3Lock { - return &Etcd3Lock{ +func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock { + return &etcd3Lock{ Mutex: concurrency.NewMutex(session, id), Session: session, } } -func (lock *Etcd3Lock) Acquire() error { +func (lock *etcd3Lock) Acquire() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -38,10 +38,10 @@ func (lock *Etcd3Lock) Acquire() error { return nil } -func (lock *Etcd3Lock) Release() error { +func (lock *etcd3Lock) Release() error { return lock.Mutex.Unlock(context.Background()) } -func (lock *Etcd3Lock) CloseSession() error { +func (lock *etcd3Lock) CloseSession() error { return lock.Session.Close() } diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 0660d4a09..0138761e1 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -18,47 +18,47 @@ var ( DefaultTtl = int64(60) ) -type Etcd3Locker struct { +type etcd3Locker struct { // etcd3 client session Client *etcd3.Client // locks is used for storing Etcd3Locks before they are // unlocked. If you want to release a lock, you need the same locker // instance and therefore we need to save them temporarily. - locks map[string]*Etcd3Lock + locks map[string]*etcd3Lock mutex sync.Mutex prefix string } // New constructs a new locker using the provided client. -func New(client *etcd3.Client) (*Etcd3Locker, error) { +func New(client *etcd3.Client) (*etcd3Locker, error) { return NewWithPrefix(client, "/tusd") } // This method may be used if a different prefix is required for multi-tenant etcd clusters -func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) { - locksMap := map[string]*Etcd3Lock{} +func NewWithPrefix(client *etcd3.Client, prefix string) (*etcd3Locker, error) { + locksMap := map[string]*etcd3Lock{} if !strings.HasPrefix(prefix, "/") { prefix = "/" + prefix } - return &Etcd3Locker{Client: client, prefix: prefix, locks: locksMap, mutex: sync.Mutex{}}, nil + return &etcd3Locker{Client: client, prefix: prefix, locks: locksMap, mutex: sync.Mutex{}}, nil } // UseIn adds this locker to the passed composer. -func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) { +func (locker *etcd3Locker) UseIn(composer *tusd.StoreComposer) { composer.UseLocker(locker) } // LockUpload tries to obtain the exclusive lock. -func (locker *Etcd3Locker) LockUpload(id string) error { +func (locker *etcd3Locker) LockUpload(id string) error { session, err := locker.createSession() if err != nil { return err } - lock := NewEtcd3Lock(session, locker.getId(id)) + lock := newEtcd3Lock(session, locker.getId(id)) err = lock.Acquire() if err != nil { @@ -74,7 +74,7 @@ func (locker *Etcd3Locker) LockUpload(id string) error { } // UnlockUpload releases a lock. If no such lock exists, no error will be returned. -func (locker *Etcd3Locker) UnlockUpload(id string) error { +func (locker *etcd3Locker) UnlockUpload(id string) error { locker.mutex.Lock() defer locker.mutex.Unlock() @@ -94,7 +94,7 @@ func (locker *Etcd3Locker) UnlockUpload(id string) error { return lock.CloseSession() } -func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { +func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { lease, err := locker.Client.Grant(context.TODO(), DefaultTtl) if err != nil { log.Fatal(err) @@ -115,6 +115,6 @@ func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { return concurrency.NewSession(locker.Client, concurrency.WithLease(lease.ID)) } -func (locker *Etcd3Locker) getId(id string) string { +func (locker *etcd3Locker) getId(id string) string { return locker.prefix + id } From 299e54dff5610553df5f7614cd7820839c715cc0 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 13 Sep 2018 13:34:43 -0400 Subject: [PATCH 06/18] context.TODO -> context.Background --- etcd3locker/locker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 0138761e1..60940aa97 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -95,13 +95,13 @@ func (locker *etcd3Locker) UnlockUpload(id string) error { } func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { - lease, err := locker.Client.Grant(context.TODO(), DefaultTtl) + lease, err := locker.Client.Grant(context.Background(), DefaultTtl) if err != nil { log.Fatal(err) } // Keep lease alive until this process dies - ch, keepAliveErr := locker.Client.KeepAlive(context.TODO(), lease.ID) + ch, keepAliveErr := locker.Client.KeepAlive(context.Background(), lease.ID) if keepAliveErr != nil { log.Fatal(keepAliveErr) } From d9dfd2482c156e9bc315cdec0d8a822cfd028b99 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 13 Sep 2018 13:36:26 -0400 Subject: [PATCH 07/18] Return errors instead of logging --- etcd3locker/locker.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 60940aa97..6588b97f1 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -4,7 +4,6 @@ package etcd3locker import ( "context" "errors" - "log" "strings" "sync" @@ -97,13 +96,13 @@ func (locker *etcd3Locker) UnlockUpload(id string) error { func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { lease, err := locker.Client.Grant(context.Background(), DefaultTtl) if err != nil { - log.Fatal(err) + return nil, err } // Keep lease alive until this process dies ch, keepAliveErr := locker.Client.KeepAlive(context.Background(), lease.ID) if keepAliveErr != nil { - log.Fatal(keepAliveErr) + return nil, keepAliveErr } go func() { From 4fbff31f81dbd45339b92aac9653f75b3e2eb00d Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 13 Sep 2018 13:47:21 -0400 Subject: [PATCH 08/18] Enforce a 1.5s timeout on acquiring a lease from etcd --- etcd3locker/locker.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 6588b97f1..823b74660 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -6,6 +6,7 @@ import ( "errors" "strings" "sync" + "time" etcd3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" @@ -15,6 +16,7 @@ import ( var ( ErrLockNotHeld = errors.New("Lock not held") DefaultTtl = int64(60) + GrantTimeout = 1500 * time.Millisecond ) type etcd3Locker struct { @@ -94,7 +96,10 @@ func (locker *etcd3Locker) UnlockUpload(id string) error { } func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { - lease, err := locker.Client.Grant(context.Background(), DefaultTtl) + ctx, cancel := context.WithTimeout(context.Background(), GrantTimeout) + defer cancel() + + lease, err := locker.Client.Grant(ctx, DefaultTtl) if err != nil { return nil, err } From ab667be7af45f03b104678090d154b535a2cc525 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 20 Sep 2018 21:33:39 +0000 Subject: [PATCH 09/18] Allow etcd3 concurreny.NewSession manage KeepAlive connection - Introduce LockerOptions to allow a custom session TTL (default: 60s); etcd3Locker can be initialized with NewWithLockerOptions if one wants granular control over TTL and prefix used for etcd3 keys - Keep NewWithPrefix backwards compatible by calling NewWithLockerOptions --- etcd3locker/locker.go | 48 +++++++++-------------------- etcd3locker/locker_options.go | 58 +++++++++++++++++++++++++++++++++++ etcd3locker/locker_test.go | 17 +++++++++- 3 files changed, 88 insertions(+), 35 deletions(-) create mode 100644 etcd3locker/locker_options.go diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 823b74660..1a22965e1 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -2,9 +2,7 @@ package etcd3locker import ( - "context" "errors" - "strings" "sync" "time" @@ -15,7 +13,6 @@ import ( var ( ErrLockNotHeld = errors.New("Lock not held") - DefaultTtl = int64(60) GrantTimeout = 1500 * time.Millisecond ) @@ -26,25 +23,28 @@ type etcd3Locker struct { // locks is used for storing Etcd3Locks before they are // unlocked. If you want to release a lock, you need the same locker // instance and therefore we need to save them temporarily. - locks map[string]*etcd3Lock - mutex sync.Mutex - prefix string + locks map[string]*etcd3Lock + mutex sync.Mutex + prefix string + sessionTimeout int } // New constructs a new locker using the provided client. func New(client *etcd3.Client) (*etcd3Locker, error) { - return NewWithPrefix(client, "/tusd") + return NewWithLockerOptions(client, DefaultLockerOptions()) } // This method may be used if a different prefix is required for multi-tenant etcd clusters func NewWithPrefix(client *etcd3.Client, prefix string) (*etcd3Locker, error) { - locksMap := map[string]*etcd3Lock{} - - if !strings.HasPrefix(prefix, "/") { - prefix = "/" + prefix - } + lockerOptions := DefaultLockerOptions() + lockerOptions.SetPrefix(prefix) + return NewWithLockerOptions(client, lockerOptions) +} - return &etcd3Locker{Client: client, prefix: prefix, locks: locksMap, mutex: sync.Mutex{}}, nil +// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. +func NewWithLockerOptions(client *etcd3.Client, opts *LockerOptions) (*etcd3Locker, error) { + locksMap := map[string]*etcd3Lock{} + return &etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil } // UseIn adds this locker to the passed composer. @@ -96,27 +96,7 @@ func (locker *etcd3Locker) UnlockUpload(id string) error { } func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { - ctx, cancel := context.WithTimeout(context.Background(), GrantTimeout) - defer cancel() - - lease, err := locker.Client.Grant(ctx, DefaultTtl) - if err != nil { - return nil, err - } - - // Keep lease alive until this process dies - ch, keepAliveErr := locker.Client.KeepAlive(context.Background(), lease.ID) - if keepAliveErr != nil { - return nil, keepAliveErr - } - - go func() { - for _ = range ch { - // do nothing - } - }() - - return concurrency.NewSession(locker.Client, concurrency.WithLease(lease.ID)) + return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTimeout)) } func (locker *etcd3Locker) getId(id string) string { diff --git a/etcd3locker/locker_options.go b/etcd3locker/locker_options.go new file mode 100644 index 000000000..00d8bb8f9 --- /dev/null +++ b/etcd3locker/locker_options.go @@ -0,0 +1,58 @@ +package etcd3locker + +import ( + "strings" +) + +var ( + DefaultTtl = 60 + DefaultPrefix = "/tusd" +) + +type LockerOptions struct { + timeoutSeconds int + prefix string +} + +func DefaultLockerOptions() *LockerOptions { + return &LockerOptions{ + timeoutSeconds: 60, + prefix: "/tusd", + } +} + +func NewLockerOptions(timeout int, prefix string) *LockerOptions { + return &LockerOptions{ + timeoutSeconds: timeout, + prefix: prefix, + } +} + +func (l *LockerOptions) Timeout() int { + if l.timeoutSeconds == 0 { + return DefaultTtl + } else { + return l.timeoutSeconds + } +} + +func (l *LockerOptions) Prefix() string { + prefix := l.prefix + if !strings.HasPrefix(prefix, "/") { + prefix = "/" + prefix + } + + if prefix == "" { + return DefaultPrefix + } else { + return prefix + } +} + +func (l *LockerOptions) SetTimeout(timeout int) { + l.timeoutSeconds = timeout +} + +func (l *LockerOptions) SetPrefix(prefix string) { + l.prefix = prefix +} diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go index 483479473..acfc33041 100644 --- a/etcd3locker/locker_test.go +++ b/etcd3locker/locker_test.go @@ -34,10 +34,25 @@ func TestEtcd3Locker(t *testing.T) { } defer client.Close() - locker, err := New(client) + shortTTL := 3 + testPrefix := "/test-tusd" + + lockerOptions := NewLockerOptions(shortTTL, testPrefix) + locker, err := NewWithLockerOptions(client, lockerOptions) a.NoError(err) a.NoError(locker.LockUpload("one")) a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) + time.Sleep(5 * time.Second) + a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) a.NoError(locker.UnlockUpload("one")) a.Equal(ErrLockNotHeld, locker.UnlockUpload("one")) + + testPrefix = "/test-tusd2" + locker2, err := NewWithPrefix(client, testPrefix) + a.NoError(err) + a.NoError(locker2.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one")) + a.NoError(locker2.UnlockUpload("one")) + a.Equal(ErrLockNotHeld, locker2.UnlockUpload("one")) } From cb149419bf9a91b7e5c1e2a1c6c16749d915d69f Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 20 Sep 2018 21:44:41 +0000 Subject: [PATCH 10/18] Add comment on locker_test.go regarding testing the session KeepAlive --- etcd3locker/locker.go | 2 +- etcd3locker/locker_options.go | 8 ++++---- etcd3locker/locker_test.go | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 1a22965e1..0733ee187 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -42,7 +42,7 @@ func NewWithPrefix(client *etcd3.Client, prefix string) (*etcd3Locker, error) { } // This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. -func NewWithLockerOptions(client *etcd3.Client, opts *LockerOptions) (*etcd3Locker, error) { +func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*etcd3Locker, error) { locksMap := map[string]*etcd3Lock{} return &etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil } diff --git a/etcd3locker/locker_options.go b/etcd3locker/locker_options.go index 00d8bb8f9..6852fa7f5 100644 --- a/etcd3locker/locker_options.go +++ b/etcd3locker/locker_options.go @@ -14,15 +14,15 @@ type LockerOptions struct { prefix string } -func DefaultLockerOptions() *LockerOptions { - return &LockerOptions{ +func DefaultLockerOptions() LockerOptions { + return LockerOptions{ timeoutSeconds: 60, prefix: "/tusd", } } -func NewLockerOptions(timeout int, prefix string) *LockerOptions { - return &LockerOptions{ +func NewLockerOptions(timeout int, prefix string) LockerOptions { + return LockerOptions{ timeoutSeconds: timeout, prefix: prefix, } diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go index acfc33041..5cd5e5083 100644 --- a/etcd3locker/locker_test.go +++ b/etcd3locker/locker_test.go @@ -43,6 +43,8 @@ func TestEtcd3Locker(t *testing.T) { a.NoError(locker.LockUpload("one")) a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) time.Sleep(5 * time.Second) + // test that we can't take over the upload via a different etcd3 session + // while an upload is already taking place; testing etcd3 session KeepAlive a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) a.NoError(locker.UnlockUpload("one")) a.Equal(ErrLockNotHeld, locker.UnlockUpload("one")) From d2f300ffe54a18b8cd0918656362bb01f699f9ff Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 20 Sep 2018 22:08:59 +0000 Subject: [PATCH 11/18] Re-export main type --- etcd3locker/locker.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 0733ee187..6ef341bee 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -16,7 +16,7 @@ var ( GrantTimeout = 1500 * time.Millisecond ) -type etcd3Locker struct { +type Etcd3Locker struct { // etcd3 client session Client *etcd3.Client @@ -30,30 +30,30 @@ type etcd3Locker struct { } // New constructs a new locker using the provided client. -func New(client *etcd3.Client) (*etcd3Locker, error) { +func New(client *etcd3.Client) (*Etcd3Locker, error) { return NewWithLockerOptions(client, DefaultLockerOptions()) } // This method may be used if a different prefix is required for multi-tenant etcd clusters -func NewWithPrefix(client *etcd3.Client, prefix string) (*etcd3Locker, error) { +func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) { lockerOptions := DefaultLockerOptions() lockerOptions.SetPrefix(prefix) return NewWithLockerOptions(client, lockerOptions) } // This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. -func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*etcd3Locker, error) { +func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) { locksMap := map[string]*etcd3Lock{} - return &etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil + return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil } // UseIn adds this locker to the passed composer. -func (locker *etcd3Locker) UseIn(composer *tusd.StoreComposer) { +func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) { composer.UseLocker(locker) } // LockUpload tries to obtain the exclusive lock. -func (locker *etcd3Locker) LockUpload(id string) error { +func (locker *Etcd3Locker) LockUpload(id string) error { session, err := locker.createSession() if err != nil { return err @@ -75,7 +75,7 @@ func (locker *etcd3Locker) LockUpload(id string) error { } // UnlockUpload releases a lock. If no such lock exists, no error will be returned. -func (locker *etcd3Locker) UnlockUpload(id string) error { +func (locker *Etcd3Locker) UnlockUpload(id string) error { locker.mutex.Lock() defer locker.mutex.Unlock() @@ -95,10 +95,10 @@ func (locker *etcd3Locker) UnlockUpload(id string) error { return lock.CloseSession() } -func (locker *etcd3Locker) createSession() (*concurrency.Session, error) { +func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTimeout)) } -func (locker *etcd3Locker) getId(id string) string { +func (locker *Etcd3Locker) getId(id string) string { return locker.prefix + id } From a1b03a1dbc9375bdbe891c2f0ece0bc947ba3927 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Fri, 28 Sep 2018 15:48:01 +0200 Subject: [PATCH 12/18] Try to address missing github.com/gogo/protobuf/gogoproto package --- .scripts/test_all.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index 042974a2f..dcf264620 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -28,6 +28,7 @@ fi install_etcd_pkgs() { go get -u github.com/coreos/etcd + go get -u github.com/gogo/protobuf/gogoproto go get -u google.golang.org/grpc go get -u github.com/coreos/go-semver go get -u github.com/ugorji/go/codec From 4fc6cb311f63e496b39de145182648d47c5f9251 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 7 Nov 2018 22:19:19 +0100 Subject: [PATCH 13/18] Update etcd package import to use go.etcd.io/etcd/clientv3 --- .scripts/test_all.sh | 6 +----- etcd3locker/lock.go | 2 +- etcd3locker/locker.go | 4 ++-- etcd3locker/locker_test.go | 5 ++--- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index dcf264620..fd6c107fe 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -27,11 +27,7 @@ else fi install_etcd_pkgs() { - go get -u github.com/coreos/etcd - go get -u github.com/gogo/protobuf/gogoproto - go get -u google.golang.org/grpc - go get -u github.com/coreos/go-semver - go get -u github.com/ugorji/go/codec + go get -u go.etcd.io/etcd/clientv3 go get -u github.com/mwitkow/go-etcd-harness } diff --git a/etcd3locker/lock.go b/etcd3locker/lock.go index d4f6b5291..a6a8bc447 100644 --- a/etcd3locker/lock.go +++ b/etcd3locker/lock.go @@ -5,7 +5,7 @@ import ( "context" "time" - "github.com/coreos/etcd/clientv3/concurrency" + "go.etcd.io/etcd/clientv3/concurrency" "github.com/tus/tusd" ) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 6ef341bee..9149a092d 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -6,8 +6,8 @@ import ( "sync" "time" - etcd3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/concurrency" + etcd3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" "github.com/tus/tusd" ) diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go index 5cd5e5083..48d777bd9 100644 --- a/etcd3locker/locker_test.go +++ b/etcd3locker/locker_test.go @@ -1,14 +1,13 @@ package etcd3locker import ( - "github.com/coreos/etcd/clientv3" - etcd_harness "github.com/mwitkow/go-etcd-harness" + "go.etcd.io/etcd/clientv3" + etcd_harness "github.com/chen-anders/go-etcd-harness" "os" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/tus/tusd" ) From e1de81aa74cae58c8a5adb6aed92807181b080ed Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 7 Nov 2018 22:20:29 +0100 Subject: [PATCH 14/18] Use forked go-etcd-harness for testing --- .scripts/test_all.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index fd6c107fe..db39ac6a7 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -28,7 +28,7 @@ fi install_etcd_pkgs() { go get -u go.etcd.io/etcd/clientv3 - go get -u github.com/mwitkow/go-etcd-harness + go get -u github.com/chen-anders/go-etcd-harness } # The etcd 3.3.x package only supports Go1.9+ and therefore From afe6887aed00c0d19e1720011626fe31f728cbad Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 7 Nov 2018 22:33:23 +0100 Subject: [PATCH 15/18] Add more extensive package overview / docs --- etcd3locker/locker.go | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go index 9149a092d..997efd391 100644 --- a/etcd3locker/locker.go +++ b/etcd3locker/locker.go @@ -1,4 +1,45 @@ +// Package etcd3locker provides a locking mechanism using an etcd3 cluster +// +// To initialize a locker, a pre-existing connected etcd3 client must be present +// +// client, err := clientv3.New(clientv3.Config{ +// Endpoints: []string{harness.Endpoint}, +// DialTimeout: 5 * time.Second, +// }) +// +// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs), +// a locker can be instantiated like the following: +// +// locker, err := etcd3locker.New(client) +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// +// The locker will need to be included in composer that is used by tusd: +// +// composer := tusd.NewStoreComposer() +// locker.UseIn(composer) +// +// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses: +// +// locker, err := etcd3locker.NewWithPrefix(client, "my-prefix") +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// +// +// For full control over all options, an etcd3.LockerOptions may be passed into +// etcd3.NewWithLockerOptions like the following example: +// +// ttl := 15 // seconds +// options := etcd3locker.NewLockerOptions(ttl, "my-prefix") +// locker, err := etcd3locker.NewWithLockerOptions(client, options) +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// // Tested on etcd 3.1/3.2/3.3 +// package etcd3locker import ( @@ -6,9 +47,9 @@ import ( "sync" "time" + "github.com/tus/tusd" etcd3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" - "github.com/tus/tusd" ) var ( From 9191262c0264edf2995654a03590d75b2a9aee9f Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 7 Nov 2018 22:33:30 +0100 Subject: [PATCH 16/18] go fmt --- etcd3locker/lock.go | 2 +- etcd3locker/locker_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etcd3locker/lock.go b/etcd3locker/lock.go index a6a8bc447..1f672387f 100644 --- a/etcd3locker/lock.go +++ b/etcd3locker/lock.go @@ -5,8 +5,8 @@ import ( "context" "time" - "go.etcd.io/etcd/clientv3/concurrency" "github.com/tus/tusd" + "go.etcd.io/etcd/clientv3/concurrency" ) type etcd3Lock struct { diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go index 48d777bd9..912554bda 100644 --- a/etcd3locker/locker_test.go +++ b/etcd3locker/locker_test.go @@ -1,8 +1,8 @@ package etcd3locker import ( - "go.etcd.io/etcd/clientv3" etcd_harness "github.com/chen-anders/go-etcd-harness" + "go.etcd.io/etcd/clientv3" "os" "testing" "time" From 09dfd635e4985b4f9b4b26e84f77589926547eef Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Wed, 7 Nov 2018 22:36:08 +0100 Subject: [PATCH 17/18] Fix test script --- .scripts/test_all.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index db39ac6a7..6746e22f3 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -42,8 +42,6 @@ if [[ "$goversion" == *"go1.5"* ]] || else # Install the etcd packages which are not vendored. install_etcd_pkgs - # use release 3.3 as master branches are not stable for etcd - (cd ../../coreos/etcd && git fetch origin && git checkout release-3.3) fi # Install the AWS SDK and Prometheus client which is explicitly not vendored From f0f040d34cf682d2b60198640707904d24887ec6 Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Thu, 8 Nov 2018 15:52:38 +0100 Subject: [PATCH 18/18] Add downloaded etcd binary to path --- .scripts/test_all.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index 6746e22f3..652cb1423 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -27,8 +27,12 @@ else fi install_etcd_pkgs() { + ETCD_VERSION="3.3.10" go get -u go.etcd.io/etcd/clientv3 go get -u github.com/chen-anders/go-etcd-harness + wget -q -O /tmp/etcd.tar.gz "https://github.com/etcd-io/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz" + tar xvzf /tmp/etcd.tar.gz -C /tmp + export PATH="$PATH:/tmp/etcd-v$ETCD_VERSION-linux-amd64" } # The etcd 3.3.x package only supports Go1.9+ and therefore