Skip to content

Commit

Permalink
Tests for consul pipe router.
Browse files Browse the repository at this point in the history
Fix a few bugs in the consul pipe router:
- Don't share a pointer
- Write nil to pipe when closing a bridge connection to ensure the connection shutdown.
  • Loading branch information
tomwilkie authored and Tom Wilkie committed Apr 7, 2016
1 parent 5fba5d8 commit 324d929
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 73 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ releases
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
.DS_Store

*.cgo1.go
*.cgo2.c
Expand Down
47 changes: 26 additions & 21 deletions app/multitenant/consul_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ConsulClient interface {
Get(key string, out interface{}) error
CAS(key string, out interface{}, f CASCallback) error
Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error
WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool)
WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool)
}

// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil.
Expand All @@ -37,7 +37,7 @@ func NewConsulClient(addr string) (ConsulClient, error) {
if err != nil {
return nil, err
}
return (*consulClient)(client), nil
return &consulClient{client.KV()}, nil
}

var (
Expand All @@ -50,12 +50,19 @@ var (
ErrNotFound = fmt.Errorf("Not found")
)

type consulClient consul.Client
type kv interface {
CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error)
Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error)
}

type consulClient struct {
kv
}

// Get and deserialise a JSON value from consul.
func (c *consulClient) Get(key string, out interface{}) error {
kv := (*consul.Client)(c).KV()
kvp, _, err := kv.Get(key, queryOptions)
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
return err
}
Expand All @@ -73,13 +80,12 @@ func (c *consulClient) Get(key string, out interface{}) error {
func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
retries = 10
retry = true
intermediate interface{}
)
for i := 0; i < retries; i++ {
kvp, _, err := kv.Get(key, queryOptions)
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
log.Errorf("Error getting %s: %v", key, err)
continue
Expand Down Expand Up @@ -111,7 +117,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
log.Errorf("Error serialising value for %s: %v", key, err)
continue
}
ok, _, err := kv.CAS(&consul.KVPair{
ok, _, err := c.kv.CAS(&consul.KVPair{
Key: key,
Value: value.Bytes(),
ModifyIndex: index,
Expand All @@ -132,17 +138,14 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
// Watch a given key value and trigger a callback when it changes.
// if callback returns false or error, exit (with the error).
func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
)
index := uint64(0)
for deadline.After(mtime.Now()) {
// Do a (blocking) long poll waiting for the entry to get updated. index here
// is really a version number; this call will wait for the key to be updated
// past said version. As we always start from version 0, we're guaranteed
// not to miss any updates - in fact we will always call the callback with
// the current value of the key immediately.
kvp, meta, err := kv.Get(key, &consul.QueryOptions{
kvp, _, err := c.kv.Get(key, &consul.QueryOptions{
RequireConsistent: true,
WaitIndex: index,
WaitTime: longPollDuration,
Expand All @@ -162,19 +165,21 @@ func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f
} else if err != nil {
return err
}

index = meta.LastIndex
index = kvp.ModifyIndex
}
return fmt.Errorf("Timed out waiting on %s", key)
}

func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) {
var (
index = uint64(0)
kv = (*consul.Client)(c).KV()
)
func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) {
index := uint64(0)
for {
kvps, meta, err := kv.List(prefix, &consul.QueryOptions{
select {
case <-done:
return
default:
}

kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{
RequireConsistent: true,
WaitIndex: index,
WaitTime: longPollDuration,
Expand Down
82 changes: 31 additions & 51 deletions app/multitenant/consul_pipe_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ import (

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/network"
"github.com/weaveworks/scope/common/xfer"
)

const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten

privateAPIPort = 4444
)

var (
Expand Down Expand Up @@ -91,11 +88,7 @@ type consulPipeRouter struct {
}

// NewConsulPipeRouter returns a new consul based router
func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) {
advertise, err := network.GetFirstAddressOf(inf)
if err != nil {
return nil, err
}
func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter {
pipeRouter := &consulPipeRouter{
prefix: prefix,
advertise: advertise,
Expand All @@ -109,9 +102,9 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI
}
pipeRouter.wait.Add(2)
go pipeRouter.watchAll()
go pipeRouter.privateAPI()
go pipeRouter.actor()
return pipeRouter, nil
go pipeRouter.privateAPI()
return pipeRouter
}

func (pr *consulPipeRouter) Stop() {
Expand All @@ -138,25 +131,20 @@ func (pr *consulPipeRouter) actor() {
// trigger an event in this loop.
func (pr *consulPipeRouter) watchAll() {
defer pr.wait.Done()
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool {
pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool {
select {
case pr.actorChan <- func() { pr.handlePipeUpdate(key, *value.(*consulPipe)) }:
return true
case <-pr.quit:
return false
default:
}

pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) }
return true
})
}

func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
log.Infof("Got update to pipe %s", key)

func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) {
// 1. If this pipe is closed, or we're not one of the ends, we
// should ensure our local pipe (and bridge) is closed.
if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) {
log.Infof("Pipe %s not in use on this node.", key)
pipe, ok := pr.activePipes[key]
delete(pr.activePipes, key)
if ok {
Expand Down Expand Up @@ -207,40 +195,41 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) {
}
}

func (pr *consulPipeRouter) getOrCreatePipe(key string) xfer.Pipe {
pc := make(chan xfer.Pipe)
pr.actorChan <- func() {
pipe, ok := pr.activePipes[key]
if !ok {
pipe = xfer.NewPipe()
pr.activePipes[key] = pipe
}
pc <- pipe
}
return <-pc
}

func (pr *consulPipeRouter) privateAPI() {
router := mux.NewRouter()
router.Methods("GET").
MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")).
HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
key = mux.Vars(r)["key"]
pc = make(chan xfer.Pipe)
)
pr.actorChan <- func() {
pc <- pr.activePipes[key]
}
pipe := <-pc
if pipe == nil {
http.NotFound(w, r)
return
}

key := mux.Vars(r)["key"]
conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Error upgrading pipe %s websocket: %v", key, err)
return
}
defer conn.Close()

pipe := pr.getOrCreatePipe(key)
end, _ := pipe.Ends()
if err := pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Printf("Error copying to pipe %s websocket: %v", key, err)
}
})

addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort)
log.Infof("Serving private API on endpoint %s.", addr)
log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router))
log.Infof("Serving private API on endpoint %s.", pr.advertise)
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router))
}

func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
Expand Down Expand Up @@ -293,18 +282,7 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer
return nil, nil, err
}

// next see if we already have a active pipe
pc := make(chan xfer.Pipe)
pr.actorChan <- func() {
pipe, ok := pr.activePipes[key]
if !ok {
pipe = xfer.NewPipe()
pr.activePipes[key] = pipe
}
pc <- pipe
}
pipe := <-pc

pipe := pr.getOrCreatePipe(key)
myEnd, _ := pipe.Ends()
if e == app.ProbeEnd {
_, myEnd = pipe.Ends()
Expand Down Expand Up @@ -385,18 +363,20 @@ func (bc *bridgeConnection) stop() {
bc.stopped = true
if bc.conn != nil {
bc.conn.Close()
end, _ := bc.pipe.Ends()
end.Write(nil) // this will cause the other end of wake up and exit
}
bc.mtx.Unlock()
bc.wait.Wait()
}

func (bc *bridgeConnection) loop() {
log.Infof("Making bridge connection for pipe %s to %s", bc.key, bc.addr)
log.Infof("Bridge connection for pipe %s to %s started", bc.key, bc.addr)
defer bc.wait.Done()
defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr)
defer log.Infof("Bridge connection for pipe %s to %s stopped", bc.key, bc.addr)

_, end := bc.pipe.Ends()
url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key))
url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key))

for {
bc.mtx.Lock()
Expand Down Expand Up @@ -425,7 +405,7 @@ func (bc *bridgeConnection) loop() {
bc.mtx.Unlock()

if err := bc.pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Printf("Error copying to pipe %s websocket: %v", bc.key, err)
log.Errorf("Error copying to pipe %s websocket: %v", bc.key, err)
}
}
}
Loading

0 comments on commit 324d929

Please sign in to comment.