From 324d9290686f7cd3836335235aedb9d2d0caf54f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 24 Mar 2016 17:00:20 +0000 Subject: [PATCH] Tests for consul pipe router. Fix a few bugs in the consul pipe router: - Don't share a pointer - Write nil to pipe when closing a bridge connection to ensure the connection shutdown. --- .gitignore | 1 + app/multitenant/consul_client.go | 47 ++-- app/multitenant/consul_pipe_router.go | 82 +++---- .../consul_pipe_router_internal_test.go | 225 ++++++++++++++++++ .../mock_consul_client_internal_test.go | 96 ++++++++ prog/app.go | 8 +- 6 files changed, 386 insertions(+), 73 deletions(-) create mode 100644 app/multitenant/consul_pipe_router_internal_test.go create mode 100644 app/multitenant/mock_consul_client_internal_test.go diff --git a/.gitignore b/.gitignore index 3744f27838..e34814e677 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ releases # Architecture specific extensions/prefixes *.[568vq] [568vq].out +.DS_Store *.cgo1.go *.cgo2.c diff --git a/app/multitenant/consul_client.go b/app/multitenant/consul_client.go index 9bb4370b0d..e824988bc6 100644 --- a/app/multitenant/consul_client.go +++ b/app/multitenant/consul_client.go @@ -22,7 +22,7 @@ type ConsulClient interface { Get(key string, out interface{}) error CAS(key string, out interface{}, f CASCallback) error Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error - WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) + WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. @@ -37,7 +37,7 @@ func NewConsulClient(addr string) (ConsulClient, error) { if err != nil { return nil, err } - return (*consulClient)(client), nil + return &consulClient{client.KV()}, nil } var ( @@ -50,12 +50,19 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -type consulClient consul.Client +type kv interface { + CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) + Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) + List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) +} + +type consulClient struct { + kv +} // Get and deserialise a JSON value from consul. func (c *consulClient) Get(key string, out interface{}) error { - kv := (*consul.Client)(c).KV() - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { return err } @@ -73,13 +80,12 @@ func (c *consulClient) Get(key string, out interface{}) error { func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { var ( index = uint64(0) - kv = (*consul.Client)(c).KV() retries = 10 retry = true intermediate interface{} ) for i := 0; i < retries; i++ { - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { log.Errorf("Error getting %s: %v", key, err) continue @@ -111,7 +117,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { log.Errorf("Error serialising value for %s: %v", key, err) continue } - ok, _, err := kv.CAS(&consul.KVPair{ + ok, _, err := c.kv.CAS(&consul.KVPair{ Key: key, Value: value.Bytes(), ModifyIndex: index, @@ -132,17 +138,14 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { // Watch a given key value and trigger a callback when it changes. // if callback returns false or error, exit (with the error). func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) + index := uint64(0) for deadline.After(mtime.Now()) { // Do a (blocking) long poll waiting for the entry to get updated. index here // is really a version number; this call will wait for the key to be updated // past said version. As we always start from version 0, we're guaranteed // not to miss any updates - in fact we will always call the callback with // the current value of the key immediately. - kvp, meta, err := kv.Get(key, &consul.QueryOptions{ + kvp, _, err := c.kv.Get(key, &consul.QueryOptions{ RequireConsistent: true, WaitIndex: index, WaitTime: longPollDuration, @@ -162,19 +165,21 @@ func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f } else if err != nil { return err } - - index = meta.LastIndex + index = kvp.ModifyIndex } return fmt.Errorf("Timed out waiting on %s", key) } -func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) +func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) { + index := uint64(0) for { - kvps, meta, err := kv.List(prefix, &consul.QueryOptions{ + select { + case <-done: + return + default: + } + + kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{ RequireConsistent: true, WaitIndex: index, WaitTime: longPollDuration, diff --git a/app/multitenant/consul_pipe_router.go b/app/multitenant/consul_pipe_router.go index e8592577a7..d31bad585b 100644 --- a/app/multitenant/consul_pipe_router.go +++ b/app/multitenant/consul_pipe_router.go @@ -15,7 +15,6 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/common/mtime" - "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/xfer" ) @@ -23,8 +22,6 @@ const ( gcInterval = 30 * time.Second // we check all the pipes every 30s pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten - - privateAPIPort = 4444 ) var ( @@ -91,11 +88,7 @@ type consulPipeRouter struct { } // NewConsulPipeRouter returns a new consul based router -func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) { - advertise, err := network.GetFirstAddressOf(inf) - if err != nil { - return nil, err - } +func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter { pipeRouter := &consulPipeRouter{ prefix: prefix, advertise: advertise, @@ -109,9 +102,9 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI } pipeRouter.wait.Add(2) go pipeRouter.watchAll() - go pipeRouter.privateAPI() go pipeRouter.actor() - return pipeRouter, nil + go pipeRouter.privateAPI() + return pipeRouter } func (pr *consulPipeRouter) Stop() { @@ -138,25 +131,20 @@ func (pr *consulPipeRouter) actor() { // trigger an event in this loop. func (pr *consulPipeRouter) watchAll() { defer pr.wait.Done() - pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool { + pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool { select { + case pr.actorChan <- func() { pr.handlePipeUpdate(key, *value.(*consulPipe)) }: + return true case <-pr.quit: return false - default: } - - pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) } - return true }) } -func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { - log.Infof("Got update to pipe %s", key) - +func (pr *consulPipeRouter) handlePipeUpdate(key string, cp consulPipe) { // 1. If this pipe is closed, or we're not one of the ends, we // should ensure our local pipe (and bridge) is closed. if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) { - log.Infof("Pipe %s not in use on this node.", key) pipe, ok := pr.activePipes[key] delete(pr.activePipes, key) if ok { @@ -207,24 +195,25 @@ func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { } } +func (pr *consulPipeRouter) getOrCreatePipe(key string) xfer.Pipe { + pc := make(chan xfer.Pipe) + pr.actorChan <- func() { + pipe, ok := pr.activePipes[key] + if !ok { + pipe = xfer.NewPipe() + pr.activePipes[key] = pipe + } + pc <- pipe + } + return <-pc +} + func (pr *consulPipeRouter) privateAPI() { router := mux.NewRouter() router.Methods("GET"). MatcherFunc(app.URLMatcher("/private/api/pipe/{key}")). HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var ( - key = mux.Vars(r)["key"] - pc = make(chan xfer.Pipe) - ) - pr.actorChan <- func() { - pc <- pr.activePipes[key] - } - pipe := <-pc - if pipe == nil { - http.NotFound(w, r) - return - } - + key := mux.Vars(r)["key"] conn, err := xfer.Upgrade(w, r, nil) if err != nil { log.Errorf("Error upgrading pipe %s websocket: %v", key, err) @@ -232,15 +221,15 @@ func (pr *consulPipeRouter) privateAPI() { } defer conn.Close() + pipe := pr.getOrCreatePipe(key) end, _ := pipe.Ends() if err := pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) { log.Printf("Error copying to pipe %s websocket: %v", key, err) } }) - addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort) - log.Infof("Serving private API on endpoint %s.", addr) - log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router)) + log.Infof("Serving private API on endpoint %s.", pr.advertise) + log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router)) } func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) { @@ -293,18 +282,7 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer return nil, nil, err } - // next see if we already have a active pipe - pc := make(chan xfer.Pipe) - pr.actorChan <- func() { - pipe, ok := pr.activePipes[key] - if !ok { - pipe = xfer.NewPipe() - pr.activePipes[key] = pipe - } - pc <- pipe - } - pipe := <-pc - + pipe := pr.getOrCreatePipe(key) myEnd, _ := pipe.Ends() if e == app.ProbeEnd { _, myEnd = pipe.Ends() @@ -385,18 +363,20 @@ func (bc *bridgeConnection) stop() { bc.stopped = true if bc.conn != nil { bc.conn.Close() + end, _ := bc.pipe.Ends() + end.Write(nil) // this will cause the other end of wake up and exit } bc.mtx.Unlock() bc.wait.Wait() } func (bc *bridgeConnection) loop() { - log.Infof("Making bridge connection for pipe %s to %s", bc.key, bc.addr) + log.Infof("Bridge connection for pipe %s to %s started", bc.key, bc.addr) defer bc.wait.Done() - defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr) + defer log.Infof("Bridge connection for pipe %s to %s stopped", bc.key, bc.addr) _, end := bc.pipe.Ends() - url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key)) + url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key)) for { bc.mtx.Lock() @@ -425,7 +405,7 @@ func (bc *bridgeConnection) loop() { bc.mtx.Unlock() if err := bc.pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) { - log.Printf("Error copying to pipe %s websocket: %v", bc.key, err) + log.Errorf("Error copying to pipe %s websocket: %v", bc.key, err) } } } diff --git a/app/multitenant/consul_pipe_router_internal_test.go b/app/multitenant/consul_pipe_router_internal_test.go new file mode 100644 index 0000000000..eb3b9673cf --- /dev/null +++ b/app/multitenant/consul_pipe_router_internal_test.go @@ -0,0 +1,225 @@ +package multitenant + +import ( + "bytes" + "fmt" + "io" + "log" + "math/rand" + "sync" + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/appclient" +) + +type adapter struct { + c appclient.AppClient +} + +func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error { + a.c.PipeConnection(pipeID, pipe) + return nil +} + +func (a adapter) PipeClose(_, pipeID string) error { + return a.c.PipeClose(pipeID) +} + +type pipeconn struct { + id string + uiPR, probePR app.PipeRouter + uiPipe, probePipe xfer.Pipe + uiIO, probeIO io.ReadWriter + + done chan struct{} + wait sync.WaitGroup +} + +func (p *pipeconn) start(t *testing.T) { + // write something to the pipe every second + // fail if we don't read something from the pipe every second + p.wait = sync.WaitGroup{} + p.wait.Add(2) + p.done = make(chan struct{}) + + msg := []byte("hello " + p.id) + go func() { + defer p.wait.Done() + + for { + select { + case <-p.done: + return + default: + } + + // write something to the probe end + _, err := p.probeIO.Write(msg) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1 * time.Second) + } + }() + + go func() { + defer p.wait.Done() + + for { + select { + case <-p.done: + return + default: + } + + // read it back off the other end + buf := make([]byte, len(msg)) + n, err := p.uiIO.Read(buf) + if n != len(buf) { + t.Fatalf("only read %d", n) + } + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, msg) { + t.Fatalf("Got: %v, Expected: %v", buf, msg) + } + } + }() +} + +func (p *pipeconn) stop() { + close(p.done) + p.wait.Wait() +} + +func TestPipeRouter(t *testing.T) { + var ( + consul = newMockConsulClient() + replicas = 2 + iterations = 100 + prs = []app.PipeRouter{} + pipes = []*pipeconn{} + ) + + for i := 0; i < replicas; i++ { + pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer) + defer pr.Stop() + prs = append(prs, pr) + } + + newPipe := func() { + // make a new pipe id + id := fmt.Sprintf("pipe-%d", rand.Int63()) + log.Printf(">>>> newPipe %s", id) + + // pick a random PR to connect app to + uiPR := prs[rand.Intn(replicas)] + uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd) + if err != nil { + t.Fatal(err) + } + + // pick a random PR to connect probe to + probePR := prs[rand.Intn(replicas)] + probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd) + if err != nil { + t.Fatal(err) + } + + pipe := &pipeconn{ + id: id, + uiPR: uiPR, + uiPipe: uiPipe, + uiIO: uiIO, + probePR: probePR, + probePipe: probePipe, + probeIO: probeIO, + } + pipe.start(t) + + pipes = append(pipes, pipe) + } + + deletePipe := func() { + // pick a random pipe + i := rand.Intn(len(pipes)) + pipe := pipes[i] + log.Printf(">>>> deletePipe %s", pipe.id) + pipe.stop() + + if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil { + t.Fatal(err) + } + + if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil { + t.Fatal(err) + } + + // remove from list + pipes = pipes[:i+copy(pipes[i:], pipes[i+1:])] + } + + reconnectPipe := func() { + // pick a random pipe + pipe := pipes[rand.Intn(len(pipes))] + log.Printf(">>>> reconnectPipe %s", pipe.id) + pipe.stop() + + // pick a random PR to connect to + newPR := prs[rand.Intn(replicas)] + + // pick a random end + if rand.Float32() < 0.5 { + if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil { + t.Fatal(err) + } + + uiPipe, uiIO, err := newPR.Get(context.Background(), pipe.id, app.UIEnd) + if err != nil { + t.Fatal(err) + } + + pipe.uiPR, pipe.uiPipe, pipe.uiIO = newPR, uiPipe, uiIO + } else { + if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil { + t.Fatal(err) + } + + probePipe, probeIO, err := newPR.Get(context.Background(), pipe.id, app.ProbeEnd) + if err != nil { + t.Fatal(err) + } + + pipe.probePR, pipe.probePipe, pipe.probeIO = newPR, probePipe, probeIO + } + + pipe.start(t) + } + + for i := 0; i < iterations; i++ { + if len(pipes) <= 0 { + newPipe() + continue + } + r := rand.Float32() + switch { + case 0.0 < r && r <= 0.3: + newPipe() + case 0.3 < r && r <= 0.6: + deletePipe() + case 0.6 < r && r <= 1.0: + reconnectPipe() + } + } + + for len(pipes) > 0 { + deletePipe() + } +} diff --git a/app/multitenant/mock_consul_client_internal_test.go b/app/multitenant/mock_consul_client_internal_test.go new file mode 100644 index 0000000000..e05a0293dd --- /dev/null +++ b/app/multitenant/mock_consul_client_internal_test.go @@ -0,0 +1,96 @@ +package multitenant + +import ( + "sync" + "time" + + consul "github.com/hashicorp/consul/api" +) + +type mockKV struct { + mtx sync.Mutex + cond *sync.Cond + kvps map[string]*consul.KVPair + next uint64 // the next update will have this 'index in the the log' +} + +func newMockConsulClient() ConsulClient { + m := mockKV{ + kvps: map[string]*consul.KVPair{}, + } + m.cond = sync.NewCond(&m.mtx) + go m.loop() + return &consulClient{&m} +} + +func copyKVPair(in *consul.KVPair) *consul.KVPair { + return &consul.KVPair{ + Key: in.Key, + CreateIndex: in.CreateIndex, + ModifyIndex: in.ModifyIndex, + LockIndex: in.LockIndex, + Flags: in.Flags, + Value: in.Value, + Session: in.Session, + } +} + +// periodic loop to wake people up, so they can honour timeouts +func (m *mockKV) loop() { + for range time.Tick(1 * time.Second) { + m.mtx.Lock() + m.cond.Broadcast() + m.mtx.Unlock() + } +} + +func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + existing, ok := m.kvps[p.Key] + if ok && existing.ModifyIndex != p.ModifyIndex { + return false, nil, nil + } + if ok { + existing.Value = p.Value + } else { + m.kvps[p.Key] = copyKVPair(p) + } + m.kvps[p.Key].ModifyIndex++ + m.kvps[p.Key].LockIndex = m.next + m.next++ + m.cond.Broadcast() + return true, nil, nil +} + +func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + value, ok := m.kvps[key] + if !ok { + return nil, nil, nil + } + for q.WaitIndex >= value.ModifyIndex { + m.cond.Wait() + } + return copyKVPair(value), nil, nil +} + +func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + deadline := time.Now().Add(q.WaitTime) + for m.next <= q.WaitIndex && time.Now().Before(deadline) { + m.cond.Wait() + } + if time.Now().After(deadline) { + return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil + } + result := consul.KVPairs{} + for _, kvp := range m.kvps { + if kvp.LockIndex >= q.WaitIndex { + result = append(result, copyKVPair(kvp)) + } + } + return result, &consul.QueryMeta{LastIndex: m.next}, nil +} diff --git a/prog/app.go b/prog/app.go index 31acf3f64c..6fa06fb137 100644 --- a/prog/app.go +++ b/prog/app.go @@ -23,6 +23,7 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/app/multitenant" "github.com/weaveworks/scope/common/middleware" + "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/docker" @@ -129,7 +130,12 @@ func pipeRouterFactory(userIDer multitenant.UserIDer, pipeRouterURL, consulInf s if err != nil { return nil, err } - return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), consulInf, userIDer) + advertise, err := network.GetFirstAddressOf(consulInf) + if err != nil { + return nil, err + } + addr := fmt.Sprintf("%s:4444", advertise) + return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), addr, userIDer), nil } return nil, fmt.Errorf("Invalid pipe router '%s'", pipeRouterURL)