Skip to content

Commit

Permalink
Tests for consul pipe router.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Mar 25, 2016
1 parent 89139fe commit b808ac7
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 43 deletions.
47 changes: 26 additions & 21 deletions app/multitenant/consul_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ConsulClient interface {
Get(key string, out interface{}) error
CAS(key string, out interface{}, f CASCallback) error
Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error
WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool)
WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool)
}

// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil.
Expand All @@ -37,7 +37,7 @@ func NewConsulClient(addr string) (ConsulClient, error) {
if err != nil {
return nil, err
}
return (*consulClient)(client), nil
return &consulClient{client.KV()}, nil
}

var (
Expand All @@ -50,12 +50,19 @@ var (
ErrNotFound = fmt.Errorf("Not found")
)

type consulClient consul.Client
type kv interface {
CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error)
Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error)
}

type consulClient struct {
kv
}

// Get and deserialise a JSON value from consul.
func (c *consulClient) Get(key string, out interface{}) error {
kv := (*consul.Client)(c).KV()
kvp, _, err := kv.Get(key, queryOptions)
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
return err
}
Expand All @@ -73,13 +80,12 @@ func (c *consulClient) Get(key string, out interface{}) error {
func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
retries = 10
retry = true
intermediate interface{}
)
for i := 0; i < retries; i++ {
kvp, _, err := kv.Get(key, queryOptions)
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
log.Errorf("Error getting %s: %v", key, err)
continue
Expand Down Expand Up @@ -111,7 +117,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
log.Errorf("Error serialising value for %s: %v", key, err)
continue
}
ok, _, err := kv.CAS(&consul.KVPair{
ok, _, err := c.kv.CAS(&consul.KVPair{
Key: key,
Value: value.Bytes(),
ModifyIndex: index,
Expand All @@ -132,17 +138,14 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
// Watch a given key value and trigger a callback when it changes.
// if callback returns false or error, exit (with the error).
func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
)
index := uint64(0)
for deadline.After(mtime.Now()) {
// Do a (blocking) long poll waiting for the entry to get updated. index here
// is really a version number; this call will wait for the key to be updated
// past said version. As we always start from version 0, we're guaranteed
// not to miss any updates - in fact we will always call the callback with
// the current value of the key immediately.
kvp, meta, err := kv.Get(key, &consul.QueryOptions{
kvp, _, err := c.kv.Get(key, &consul.QueryOptions{
RequireConsistent: true,
WaitIndex: index,
WaitTime: longPollDuration,
Expand All @@ -162,19 +165,21 @@ func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f
} else if err != nil {
return err
}

index = meta.LastIndex
index = kvp.ModifyIndex
}
return fmt.Errorf("Timed out waiting on %s", key)
}

func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
)
func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) {
index := uint64(0)
for {
kvps, meta, err := kv.List(prefix, &consul.QueryOptions{
select {
case <-done:
return
default:
}

kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{
RequireConsistent: true,
WaitIndex: index,
WaitTime: longPollDuration,
Expand Down
32 changes: 11 additions & 21 deletions app/multitenant/consul_pipe_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ import (

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/network"
"github.com/weaveworks/scope/common/xfer"
)

const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten

privateAPIPort = 4444
)

var (
Expand Down Expand Up @@ -91,11 +88,7 @@ type consulPipeRouter struct {
}

// NewConsulPipeRouter returns a new consul based router
func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) {
advertise, err := network.GetFirstAddressOf(inf)
if err != nil {
return nil, err
}
func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
pipeRouter := &consulPipeRouter{
prefix: prefix,
advertise: advertise,
Expand All @@ -109,9 +102,9 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI
}
pipeRouter.wait.Add(2)
go pipeRouter.watchAll()
go pipeRouter.privateAPI()
go pipeRouter.actor()
return pipeRouter, nil
go pipeRouter.privateAPI()
return pipeRouter
}

func (pr *consulPipeRouter) Stop() {
Expand All @@ -138,25 +131,23 @@ func (pr *consulPipeRouter) actor() {
// trigger an event in this loop.
func (pr *consulPipeRouter) watchAll() {
defer pr.wait.Done()
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool {
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
select {
case pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) }:
return true
case <-pr.quit:
return false
default:
}

pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) }
return true
})
}

func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
log.Infof("Got update to pipe %s", key)
//log.Infof("Got update to pipe %s", key)

// 1. If this pipe is closed, or we're not one of the ends, we
// should ensure our local pipe (and bridge) is closed.
if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
log.Infof("Pipe %s not in use on this node.", key)
//log.Infof("Pipe %s not in use on this node.", key)
pipe, ok := pr.activePipes[key]
delete(pr.activePipes, key)
if ok {
Expand Down Expand Up @@ -238,9 +229,8 @@ func (pr *consulPipeRouter) privateAPI() {
}
})

addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort)
log.Infof("Serving private API on endpoint %s.", addr)
log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router))
log.Infof("Serving private API on endpoint %s.", pr.advertise)
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router))
}

func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
Expand Down Expand Up @@ -396,7 +386,7 @@ func (bc *bridgeConnection) loop() {
defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr)

_, end := bc.pipe.Ends()
url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key))
url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))

for {
bc.mtx.Lock()
Expand Down
114 changes: 114 additions & 0 deletions app/multitenant/consul_pipe_router_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package multitenant

import (
"fmt"
"io"
"math/rand"
"testing"

"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
)

type adapter struct {
c appclient.AppClient
}

func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
a.c.PipeConnection(pipeID, pipe)
return nil
}

func (a adapter) PipeClose(_, pipeID string) error {
return a.c.PipeClose(pipeID)
}

func doPipe(t *testing.T, pr1, pr2 app.PipeRouter) {

}

type pipeconn struct {
id string
uiPR, probePR app.PipeRouter
uiPipe, probePipe xfer.Pipe
uiIO, probeIO io.ReadWriter
}

func TestPipeRouter(t *testing.T) {
var (
consul = newMockConsulClient()
replicas = 2
iterations = 100
prs = []app.PipeRouter{}
pipes = []pipeconn{}
)

for i := 0; i < replicas; i++ {
pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer)
defer pr.Stop()
prs = append(prs, pr)
}

newPipe := func(i int) {
// make a new pipe id
id := fmt.Sprintf("pipe-%d", rand.Int63())

// pick a random PR to connect app to
uiPR := prs[rand.Intn(replicas)]
uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd)
if err != nil {
t.Fatal(err)
}

// pick a random PR to connect probe to
probePR := prs[rand.Intn(replicas)]
probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd)
if err != nil {
t.Fatal(err)
}

pipes = append(pipes, pipeconn{
id: id,
uiPR: uiPR,
uiPipe: uiPipe,
uiIO: uiIO,
probePR: probePR,
probePipe: probePipe,
probeIO: probeIO,
})
}

deletePipe := func() {
// pick a random pipe
i := rand.Intn(len(pipes))
pipe := pipes[i]

if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil {
t.Fatal(err)
}

if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil {
t.Fatal(err)
}

// remove from list
pipes = pipes[:i+copy(pipes[i:], pipes[i+1:])]
}

for i := 0; i < iterations; i++ {
if len(pipes) <= 0 {
newPipe(i)
continue
}
r := rand.Float32()
switch {
case r < 0.5:
newPipe(i)
case r > 0.5:
deletePipe()
}
}
}
Loading

0 comments on commit b808ac7

Please sign in to comment.