diff --git a/app/multitenant/consul_client.go b/app/multitenant/consul_client.go index 9bb4370b0d..e824988bc6 100644 --- a/app/multitenant/consul_client.go +++ b/app/multitenant/consul_client.go @@ -22,7 +22,7 @@ type ConsulClient interface { Get(key string, out interface{}) error CAS(key string, out interface{}, f CASCallback) error Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error - WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) + WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. @@ -37,7 +37,7 @@ func NewConsulClient(addr string) (ConsulClient, error) { if err != nil { return nil, err } - return (*consulClient)(client), nil + return &consulClient{client.KV()}, nil } var ( @@ -50,12 +50,19 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -type consulClient consul.Client +type kv interface { + CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) + Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) + List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) +} + +type consulClient struct { + kv +} // Get and deserialise a JSON value from consul. func (c *consulClient) Get(key string, out interface{}) error { - kv := (*consul.Client)(c).KV() - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { return err } @@ -73,13 +80,12 @@ func (c *consulClient) Get(key string, out interface{}) error { func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { var ( index = uint64(0) - kv = (*consul.Client)(c).KV() retries = 10 retry = true intermediate interface{} ) for i := 0; i < retries; i++ { - kvp, _, err := kv.Get(key, queryOptions) + kvp, _, err := c.kv.Get(key, queryOptions) if err != nil { log.Errorf("Error getting %s: %v", key, err) continue @@ -111,7 +117,7 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { log.Errorf("Error serialising value for %s: %v", key, err) continue } - ok, _, err := kv.CAS(&consul.KVPair{ + ok, _, err := c.kv.CAS(&consul.KVPair{ Key: key, Value: value.Bytes(), ModifyIndex: index, @@ -132,17 +138,14 @@ func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error { // Watch a given key value and trigger a callback when it changes. // if callback returns false or error, exit (with the error). func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f func(interface{}) (bool, error)) error { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) + index := uint64(0) for deadline.After(mtime.Now()) { // Do a (blocking) long poll waiting for the entry to get updated. index here // is really a version number; this call will wait for the key to be updated // past said version. As we always start from version 0, we're guaranteed // not to miss any updates - in fact we will always call the callback with // the current value of the key immediately. - kvp, meta, err := kv.Get(key, &consul.QueryOptions{ + kvp, _, err := c.kv.Get(key, &consul.QueryOptions{ RequireConsistent: true, WaitIndex: index, WaitTime: longPollDuration, @@ -162,19 +165,21 @@ func (c *consulClient) Watch(key string, deadline time.Time, out interface{}, f } else if err != nil { return err } - - index = meta.LastIndex + index = kvp.ModifyIndex } return fmt.Errorf("Timed out waiting on %s", key) } -func (c *consulClient) WatchPrefix(prefix string, out interface{}, f func(string, interface{}) bool) { - var ( - index = uint64(0) - kv = (*consul.Client)(c).KV() - ) +func (c *consulClient) WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool) { + index := uint64(0) for { - kvps, meta, err := kv.List(prefix, &consul.QueryOptions{ + select { + case <-done: + return + default: + } + + kvps, meta, err := c.kv.List(prefix, &consul.QueryOptions{ RequireConsistent: true, WaitIndex: index, WaitTime: longPollDuration, diff --git a/app/multitenant/consul_pipe_router.go b/app/multitenant/consul_pipe_router.go index e8592577a7..a0e6d77098 100644 --- a/app/multitenant/consul_pipe_router.go +++ b/app/multitenant/consul_pipe_router.go @@ -15,7 +15,6 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/common/mtime" - "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/xfer" ) @@ -23,8 +22,6 @@ const ( gcInterval = 30 * time.Second // we check all the pipes every 30s pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten - - privateAPIPort = 4444 ) var ( @@ -91,11 +88,7 @@ type consulPipeRouter struct { } // NewConsulPipeRouter returns a new consul based router -func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserIDer) (app.PipeRouter, error) { - advertise, err := network.GetFirstAddressOf(inf) - if err != nil { - return nil, err - } +func NewConsulPipeRouter(client ConsulClient, prefix, advertise string, userIDer UserIDer) app.PipeRouter { pipeRouter := &consulPipeRouter{ prefix: prefix, advertise: advertise, @@ -109,9 +102,9 @@ func NewConsulPipeRouter(client ConsulClient, prefix, inf string, userIDer UserI } pipeRouter.wait.Add(2) go pipeRouter.watchAll() - go pipeRouter.privateAPI() go pipeRouter.actor() - return pipeRouter, nil + go pipeRouter.privateAPI() + return pipeRouter } func (pr *consulPipeRouter) Stop() { @@ -138,25 +131,23 @@ func (pr *consulPipeRouter) actor() { // trigger an event in this loop. func (pr *consulPipeRouter) watchAll() { defer pr.wait.Done() - pr.client.WatchPrefix(pr.prefix, &consulPipe{}, func(key string, value interface{}) bool { + pr.client.WatchPrefix(pr.prefix, &consulPipe{}, pr.quit, func(key string, value interface{}) bool { select { + case pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) }: + return true case <-pr.quit: return false - default: } - - pr.actorChan <- func() { pr.handlePipeUpdate(key, value.(*consulPipe)) } - return true }) } func (pr *consulPipeRouter) handlePipeUpdate(key string, cp *consulPipe) { - log.Infof("Got update to pipe %s", key) + //log.Infof("Got update to pipe %s", key) // 1. If this pipe is closed, or we're not one of the ends, we // should ensure our local pipe (and bridge) is closed. if !cp.DeletedAt.IsZero() || !cp.eitherEndFor(pr.advertise) { - log.Infof("Pipe %s not in use on this node.", key) + //log.Infof("Pipe %s not in use on this node.", key) pipe, ok := pr.activePipes[key] delete(pr.activePipes, key) if ok { @@ -238,9 +229,8 @@ func (pr *consulPipeRouter) privateAPI() { } }) - addr := fmt.Sprintf("%s:%d", pr.advertise, privateAPIPort) - log.Infof("Serving private API on endpoint %s.", addr) - log.Infof("Private API terminated: %v", http.ListenAndServe(addr, router)) + log.Infof("Serving private API on endpoint %s.", pr.advertise) + log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router)) } func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) { @@ -396,7 +386,7 @@ func (bc *bridgeConnection) loop() { defer log.Infof("Stopping bridge connection for pipe %s to %s", bc.key, bc.addr) _, end := bc.pipe.Ends() - url := fmt.Sprintf("ws://%s:%d/private/api/pipe/%s", bc.addr, privateAPIPort, url.QueryEscape(bc.key)) + url := fmt.Sprintf("ws://%s/private/api/pipe/%s", bc.addr, url.QueryEscape(bc.key)) for { bc.mtx.Lock() diff --git a/app/multitenant/consul_pipe_router_internal_test.go b/app/multitenant/consul_pipe_router_internal_test.go new file mode 100644 index 0000000000..a161f2fb23 --- /dev/null +++ b/app/multitenant/consul_pipe_router_internal_test.go @@ -0,0 +1,114 @@ +package multitenant + +import ( + "fmt" + "io" + "math/rand" + "testing" + + "golang.org/x/net/context" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/appclient" +) + +type adapter struct { + c appclient.AppClient +} + +func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error { + a.c.PipeConnection(pipeID, pipe) + return nil +} + +func (a adapter) PipeClose(_, pipeID string) error { + return a.c.PipeClose(pipeID) +} + +func doPipe(t *testing.T, pr1, pr2 app.PipeRouter) { + +} + +type pipeconn struct { + id string + uiPR, probePR app.PipeRouter + uiPipe, probePipe xfer.Pipe + uiIO, probeIO io.ReadWriter +} + +func TestPipeRouter(t *testing.T) { + var ( + consul = newMockConsulClient() + replicas = 2 + iterations = 100 + prs = []app.PipeRouter{} + pipes = []pipeconn{} + ) + + for i := 0; i < replicas; i++ { + pr := NewConsulPipeRouter(consul, "", fmt.Sprintf("127.0.0.1:44%02d", i), NoopUserIDer) + defer pr.Stop() + prs = append(prs, pr) + } + + newPipe := func(i int) { + // make a new pipe id + id := fmt.Sprintf("pipe-%d", rand.Int63()) + + // pick a random PR to connect app to + uiPR := prs[rand.Intn(replicas)] + uiPipe, uiIO, err := uiPR.Get(context.Background(), id, app.UIEnd) + if err != nil { + t.Fatal(err) + } + + // pick a random PR to connect probe to + probePR := prs[rand.Intn(replicas)] + probePipe, probeIO, err := probePR.Get(context.Background(), id, app.ProbeEnd) + if err != nil { + t.Fatal(err) + } + + pipes = append(pipes, pipeconn{ + id: id, + uiPR: uiPR, + uiPipe: uiPipe, + uiIO: uiIO, + probePR: probePR, + probePipe: probePipe, + probeIO: probeIO, + }) + } + + deletePipe := func() { + // pick a random pipe + i := rand.Intn(len(pipes)) + pipe := pipes[i] + + if err := pipe.uiPR.Release(context.Background(), pipe.id, app.UIEnd); err != nil { + t.Fatal(err) + } + + if err := pipe.probePR.Release(context.Background(), pipe.id, app.ProbeEnd); err != nil { + t.Fatal(err) + } + + // remove from list + pipes = pipes[:i+copy(pipes[i:], pipes[i+1:])] + } + + for i := 0; i < iterations; i++ { + if len(pipes) <= 0 { + newPipe(i) + continue + } + r := rand.Float32() + switch { + case r < 0.5: + newPipe(i) + case r > 0.5: + deletePipe() + } + } +} diff --git a/app/multitenant/mock_consul_client_internal_test.go b/app/multitenant/mock_consul_client_internal_test.go new file mode 100644 index 0000000000..6d0dc6ecdf --- /dev/null +++ b/app/multitenant/mock_consul_client_internal_test.go @@ -0,0 +1,96 @@ +package multitenant + +import ( + "sync" + "time" + + consul "github.com/hashicorp/consul/api" +) + +type mockKV struct { + mtx sync.Mutex + cond *sync.Cond + kvps map[string]*consul.KVPair + next uint64 // the next update will have this 'index in the the log' +} + +func newMockConsulClient() ConsulClient { + m := mockKV{ + kvps: map[string]*consul.KVPair{}, + } + m.cond = sync.NewCond(&m.mtx) + go m.loop() + return &consulClient{&m} +} + +func copyKVPair(in *consul.KVPair) *consul.KVPair { + return &consul.KVPair{ + Key: in.Key, + CreateIndex: in.CreateIndex, + ModifyIndex: in.ModifyIndex, + LockIndex: in.LockIndex, + Flags: in.Flags, + Value: in.Value, + Session: in.Session, + } +} + +// periodic loop to wake people up, so they can honour timeouts +func (m *mockKV) loop() { + for range time.Tick(1 * time.Second) { + m.mtx.Lock() + m.cond.Broadcast() + m.mtx.Unlock() + } +} + +func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + existing, ok := m.kvps[p.Key] + if ok && existing.ModifyIndex != p.ModifyIndex { + return false, nil, nil + } + if ok { + existing.Value = p.Value + } else { + m.kvps[p.Key] = copyKVPair(p) + } + m.kvps[p.Key].ModifyIndex++ + m.kvps[p.Key].LockIndex = m.next + m.next++ + m.cond.Broadcast() + return true, nil, nil +} + +func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + value, ok := m.kvps[key] + if !ok { + return nil, nil, nil + } + for q.WaitIndex >= value.ModifyIndex { + m.cond.Wait() + } + return copyKVPair(value), nil, nil +} + +func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + deadline := time.Now().Add(q.WaitTime) + for m.next <= q.WaitIndex && time.Now().Before(deadline) { + m.cond.Wait() + } + if time.Now().After(deadline) { + return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil + } + result := consul.KVPairs{} + for _, kvp := range m.kvps { + if kvp.LockIndex <= q.WaitIndex { + result = append(result, copyKVPair(kvp)) + } + } + return result, &consul.QueryMeta{LastIndex: m.next}, nil +} diff --git a/prog/app.go b/prog/app.go index 937d4c04c4..613169cca8 100644 --- a/prog/app.go +++ b/prog/app.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/app/multitenant" + "github.com/weaveworks/scope/common/network" "github.com/weaveworks/scope/common/weave" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/docker" @@ -101,7 +102,12 @@ func pipeRouterFactory(userIDer multitenant.UserIDer, pipeRouterURL, consulInf s if err != nil { return nil, err } - return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), consulInf, userIDer) + advertise, err := network.GetFirstAddressOf(consulInf) + if err != nil { + return nil, err + } + addr := fmt.Sprintf("%s:4444", advertise) + return multitenant.NewConsulPipeRouter(consulClient, strings.TrimPrefix(parsed.Path, "/"), addr, userIDer), nil } return nil, fmt.Errorf("Invalid pipe router '%s'", pipeRouterURL)