Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd3 Locker: First pass #202

Merged
merged 18 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion .scripts/test_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e
# Find all packages containing Go source code inside the current directory
packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq)

# The consul package only supports Go1.8+ and therefore we will only run the
# The consul package only supports Go1.10+ and therefore we will only run the
# corresponding tests on these versions.
goversion=$(go version)
if [[ "$goversion" == *"go1.5"* ]] ||
Expand All @@ -26,6 +26,28 @@ else
go get -u github.com/hashicorp/consul/...
fi

install_etcd_pkgs() {
ETCD_VERSION="3.3.10"
go get -u go.etcd.io/etcd/clientv3
go get -u github.com/chen-anders/go-etcd-harness
wget -q -O /tmp/etcd.tar.gz "https://github.com/etcd-io/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz"
tar xvzf /tmp/etcd.tar.gz -C /tmp
export PATH="$PATH:/tmp/etcd-v$ETCD_VERSION-linux-amd64"
}

# The etcd 3.3.x package only supports Go1.9+ and therefore
# we will only run the corresponding tests on these versions.
if [[ "$goversion" == *"go1.5"* ]] ||
[[ "$goversion" == *"go1.6"* ]] ||
[[ "$goversion" == *"go1.7"* ]] ||
[[ "$goversion" == *"go1.8"* ]]; then
echo "Skipping tests requiring etcd3locker, which is not supported on $goversion"
packages=$(echo "$packages" | sed '/etcd3locker/d')
else
# Install the etcd packages which are not vendored.
install_etcd_pkgs
fi

# Install the AWS SDK and Prometheus client which is explicitly not vendored
go get -u github.com/aws/aws-sdk-go/...
go get -u github.com/prometheus/client_golang/prometheus
Expand Down
47 changes: 47 additions & 0 deletions etcd3locker/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Tested on etcd 3.1+
package etcd3locker

import (
"context"
"time"

"github.com/tus/tusd"
"go.etcd.io/etcd/clientv3/concurrency"
)

type etcd3Lock struct {
Id string
Mutex *concurrency.Mutex
Session *concurrency.Session
}

func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock {
return &etcd3Lock{
Mutex: concurrency.NewMutex(session, id),
Session: session,
}
}

func (lock *etcd3Lock) Acquire() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// this is a blocking call; if we receive DeadlineExceeded
// the lock is most likely already taken
if err := lock.Mutex.Lock(ctx); err != nil {
if err == context.DeadlineExceeded {
return tusd.ErrFileLocked
} else {
return err
}
}
return nil
}

func (lock *etcd3Lock) Release() error {
return lock.Mutex.Unlock(context.Background())
}

func (lock *etcd3Lock) CloseSession() error {
return lock.Session.Close()
}
145 changes: 145 additions & 0 deletions etcd3locker/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Package etcd3locker provides a locking mechanism using an etcd3 cluster
//
// To initialize a locker, a pre-existing connected etcd3 client must be present
//
// client, err := clientv3.New(clientv3.Config{
// Endpoints: []string{harness.Endpoint},
// DialTimeout: 5 * time.Second,
// })
//
// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs),
// a locker can be instantiated like the following:
//
// locker, err := etcd3locker.New(client)
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
// The locker will need to be included in composer that is used by tusd:
//
// composer := tusd.NewStoreComposer()
// locker.UseIn(composer)
//
// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses:
//
// locker, err := etcd3locker.NewWithPrefix(client, "my-prefix")
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
//
// For full control over all options, an etcd3.LockerOptions may be passed into
// etcd3.NewWithLockerOptions like the following example:
//
// ttl := 15 // seconds
// options := etcd3locker.NewLockerOptions(ttl, "my-prefix")
// locker, err := etcd3locker.NewWithLockerOptions(client, options)
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
// Tested on etcd 3.1/3.2/3.3
//
package etcd3locker

import (
"errors"
"sync"
"time"

"github.com/tus/tusd"
etcd3 "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)

var (
ErrLockNotHeld = errors.New("Lock not held")
GrantTimeout = 1500 * time.Millisecond
)

type Etcd3Locker struct {
// etcd3 client session
Client *etcd3.Client

// locks is used for storing Etcd3Locks before they are
// unlocked. If you want to release a lock, you need the same locker
// instance and therefore we need to save them temporarily.
locks map[string]*etcd3Lock
mutex sync.Mutex
prefix string
sessionTimeout int
}

// New constructs a new locker using the provided client.
func New(client *etcd3.Client) (*Etcd3Locker, error) {
return NewWithLockerOptions(client, DefaultLockerOptions())
}

// This method may be used if a different prefix is required for multi-tenant etcd clusters
func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) {
lockerOptions := DefaultLockerOptions()
lockerOptions.SetPrefix(prefix)
return NewWithLockerOptions(client, lockerOptions)
}

// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular.
func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) {
locksMap := map[string]*etcd3Lock{}
return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil
}

// UseIn adds this locker to the passed composer.
func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) {
composer.UseLocker(locker)
}

// LockUpload tries to obtain the exclusive lock.
func (locker *Etcd3Locker) LockUpload(id string) error {
session, err := locker.createSession()
if err != nil {
return err
}

lock := newEtcd3Lock(session, locker.getId(id))

err = lock.Acquire()
if err != nil {
return err
}

locker.mutex.Lock()
defer locker.mutex.Unlock()
// Only add the lock to our list if the acquire was successful and no error appeared.
locker.locks[locker.getId(id)] = lock

return nil
}

// UnlockUpload releases a lock. If no such lock exists, no error will be returned.
func (locker *Etcd3Locker) UnlockUpload(id string) error {
locker.mutex.Lock()
defer locker.mutex.Unlock()

// Complain if no lock has been found. This can only happen if LockUpload
// has not been invoked before or UnlockUpload multiple times.
lock, ok := locker.locks[locker.getId(id)]
if !ok {
return ErrLockNotHeld
}

err := lock.Release()
if err != nil {
return err
}

defer delete(locker.locks, locker.getId(id))
return lock.CloseSession()
}

func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) {
return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTimeout))
}

func (locker *Etcd3Locker) getId(id string) string {
return locker.prefix + id
}
58 changes: 58 additions & 0 deletions etcd3locker/locker_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package etcd3locker

import (
"strings"
)

var (
DefaultTtl = 60
DefaultPrefix = "/tusd"
)

type LockerOptions struct {
timeoutSeconds int
prefix string
}

func DefaultLockerOptions() LockerOptions {
return LockerOptions{
timeoutSeconds: 60,
prefix: "/tusd",
}
}

func NewLockerOptions(timeout int, prefix string) LockerOptions {
return LockerOptions{
timeoutSeconds: timeout,
prefix: prefix,
}
}

func (l *LockerOptions) Timeout() int {
if l.timeoutSeconds == 0 {
return DefaultTtl
} else {
return l.timeoutSeconds
}
}

func (l *LockerOptions) Prefix() string {
prefix := l.prefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}

if prefix == "" {
return DefaultPrefix
} else {
return prefix
}
}

func (l *LockerOptions) SetTimeout(timeout int) {
l.timeoutSeconds = timeout
}

func (l *LockerOptions) SetPrefix(prefix string) {
l.prefix = prefix
}
59 changes: 59 additions & 0 deletions etcd3locker/locker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package etcd3locker

import (
etcd_harness "github.com/chen-anders/go-etcd-harness"
"go.etcd.io/etcd/clientv3"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/tus/tusd"
)

func TestEtcd3Locker(t *testing.T) {
a := assert.New(t)

harness, err := etcd_harness.New(os.Stderr)
if err != nil {
t.Fatalf("failed starting etcd harness: %v", err)
}
t.Logf("will use etcd harness endpoint: %v", harness.Endpoint)
defer func() {
harness.Stop()
t.Logf("cleaned up etcd harness")
}()

client, err := clientv3.New(clientv3.Config{
Endpoints: []string{harness.Endpoint},
DialTimeout: 5 * time.Second,
})
if err != nil {
t.Fatalf("Unable to connect to etcd3: %v", err)
}
defer client.Close()

shortTTL := 3
testPrefix := "/test-tusd"

lockerOptions := NewLockerOptions(shortTTL, testPrefix)
locker, err := NewWithLockerOptions(client, lockerOptions)
a.NoError(err)
a.NoError(locker.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
time.Sleep(5 * time.Second)
// test that we can't take over the upload via a different etcd3 session
// while an upload is already taking place; testing etcd3 session KeepAlive
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
a.NoError(locker.UnlockUpload("one"))
a.Equal(ErrLockNotHeld, locker.UnlockUpload("one"))

testPrefix = "/test-tusd2"
locker2, err := NewWithPrefix(client, testPrefix)
a.NoError(err)
a.NoError(locker2.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one"))
a.NoError(locker2.UnlockUpload("one"))
a.Equal(ErrLockNotHeld, locker2.UnlockUpload("one"))
}
2 changes: 1 addition & 1 deletion vendor/vendor.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"comment": "",
"ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/",
"ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/ github.com/coreos/etcd github.com/mwitkow/go-etcd-harness",
"package": [
{
"path": "appengine",
Expand Down