Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring for multitenancy #1150

Merged
merged 1 commit into from
Mar 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/api_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
// Raw report handler
func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, rep.Report(ctx))
report, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
respondWith(w, http.StatusOK, report)
}
}
7 changes: 6 additions & 1 deletion app/api_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ func (r *registry) walk(f func(APITopologyDesc)) {
// makeTopologyList returns a handler that yields an APITopologyList.
func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topologies := r.renderTopologies(rep.Report(ctx), req)
report, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
topologies := r.renderTopologies(report, req)
respondWith(w, http.StatusOK, topologies)
}
}
Expand Down
26 changes: 20 additions & 6 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ type APINode struct {

// Full topology.
func handleTopology(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
report, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
respondWith(w, http.StatusOK, APITopology{
Nodes: renderer.Render(rep.Report(ctx)).Prune(),
Nodes: renderer.Render(report).Prune(),
})
}

Expand All @@ -54,15 +59,19 @@ func handleWs(ctx context.Context, rep Reporter, renderer render.Renderer, w htt
func handleNode(topologyID, nodeID string) func(context.Context, Reporter, render.Renderer, http.ResponseWriter, *http.Request) {
return func(ctx context.Context, rep Reporter, renderer render.Renderer, w http.ResponseWriter, r *http.Request) {
var (
rpt = rep.Report(ctx)
rendered = renderer.Render(rep.Report(ctx))
node, ok = rendered[nodeID]
report, err = rep.Report(ctx)
rendered = renderer.Render(report)
node, ok = rendered[nodeID]
)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
if !ok {
http.NotFound(w, r)
return
}
respondWith(w, http.StatusOK, APINode{Node: detailed.MakeNode(topologyID, rpt, rendered, node)})
respondWith(w, http.StatusOK, APINode{Node: detailed.MakeNode(topologyID, report, rendered, node)})
}
}

Expand Down Expand Up @@ -103,7 +112,12 @@ func handleWebsocket(
defer rep.UnWait(ctx, wait)

for {
newTopo := renderer.Render(rep.Report(ctx)).Prune()
report, err := rep.Report(ctx)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
newTopo := renderer.Render(report).Prune()
diff := render.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo

Expand Down
13 changes: 7 additions & 6 deletions app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report(context.Context) report.Report
Report(context.Context) (report.Report, error)
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}

// Adder is something that can accept reports. It's a convenient interface for
// parts of the app, and several experimental components.
type Adder interface {
Add(context.Context, report.Report)
Add(context.Context, report.Report) error
}

// A Collector is a Reporter and an Adder
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewCollector(window time.Duration) Collector {
var now = time.Now

// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(_ context.Context, rpt report.Report) {
func (c *collector) Add(_ context.Context, rpt report.Report) error {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, timestampReport{now(), rpt})
Expand All @@ -92,11 +92,12 @@ func (c *collector) Add(_ context.Context, rpt report.Report) {
if rpt.Shortcut {
c.Broadcast()
}
return nil
}

// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report(_ context.Context) report.Report {
func (c *collector) Report(_ context.Context) (report.Report, error) {
c.mtx.Lock()
defer c.mtx.Unlock()

Expand All @@ -105,7 +106,7 @@ func (c *collector) Report(_ context.Context) report.Report {
if c.cached != nil && len(c.reports) > 0 {
oldest := now().Add(-c.window)
if c.reports[0].timestamp.Before(oldest) {
return *c.cached
return *c.cached, nil
}
}
c.reports = clean(c.reports, c.window)
Expand All @@ -118,7 +119,7 @@ func (c *collector) Report(_ context.Context) report.Report {
}
rpt.ID = fmt.Sprintf("%x", id.Sum64())
c.cached = &rpt
return rpt
return rpt, nil
}

type timestampReport struct {
Expand Down
18 changes: 15 additions & 3 deletions app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,32 @@ func TestCollector(t *testing.T) {
r2 := report.MakeReport()
r2.Endpoint.AddNode("bar", report.MakeNode())

if want, have := report.MakeReport(), c.Report(ctx); !reflect.DeepEqual(want, have) {
have, err := c.Report(ctx)
if err != nil {
t.Error(err)
}
if want := report.MakeReport(); !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(ctx, r1)
if want, have := r1, c.Report(ctx); !reflect.DeepEqual(want, have) {
have, err = c.Report(ctx)
if err != nil {
t.Error(err)
}
if want := r1; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}

c.Add(ctx, r2)
merged := report.MakeReport()
merged = merged.Merge(r1)
merged = merged.Merge(r2)
if want, have := merged, c.Report(ctx); !reflect.DeepEqual(want, have) {
have, err = c.Report(ctx)
if err != nil {
t.Error(err)
}
if want := merged; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
Expand Down
8 changes: 4 additions & 4 deletions app/mock_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// StaticReport is used as a fixture in tests. It emulates an xfer.Collector.
type StaticReport struct{}

func (s StaticReport) Report(context.Context) report.Report { return fixture.Report }
func (s StaticReport) Add(context.Context, report.Report) {}
func (s StaticReport) WaitOn(context.Context, chan struct{}) {}
func (s StaticReport) UnWait(context.Context, chan struct{}) {}
func (s StaticReport) Report(context.Context) (report.Report, error) { return fixture.Report, nil }
func (s StaticReport) Add(context.Context, report.Report) error { return nil }
func (s StaticReport) WaitOn(context.Context, chan struct{}) {}
func (s StaticReport) UnWait(context.Context, chan struct{}) {}
27 changes: 15 additions & 12 deletions app/pipe_router.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"fmt"
"io"
"sync"
"time"
Expand Down Expand Up @@ -29,9 +30,9 @@ const (

// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, bool)
Release(context.Context, string, End)
Delete(context.Context, string)
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
Release(context.Context, string, End) error
Delete(context.Context, string) error
Stop()
}

Expand Down Expand Up @@ -77,7 +78,7 @@ func NewLocalPipeRouter() PipeRouter {
return pipeRouter
}

func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, bool) {
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
Expand All @@ -91,43 +92,45 @@ func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe,
pr.pipes[id] = p
}
if p.Closed() {
return nil, nil, false
return nil, nil, fmt.Errorf("Pipe %s closed", id)
}
end, endIO := p.end(e)
end.refCount++
return p, endIO, true
return p, endIO, nil
}

func (pr *localPipeRouter) Release(_ context.Context, id string, e End) {
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) error {
pr.Lock()
defer pr.Unlock()

p, ok := pr.pipes[id]
if !ok {
// uh oh
return
return fmt.Errorf("Pipe %s not found", id)
}

end, _ := p.end(e)
end.refCount--
if end.refCount > 0 {
return
return nil
}

if !p.Closed() {
end.lastUsedTime = mtime.Now()
}

return nil
}

func (pr *localPipeRouter) Delete(_ context.Context, id string) {
func (pr *localPipeRouter) Delete(_ context.Context, id string) error {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return
return nil
}
p.Close()
p.tombstoneTime = mtime.Now()
return nil
}

func (pr *localPipeRouter) Stop() {
Expand Down
13 changes: 8 additions & 5 deletions app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func RegisterPipeRoutes(router *mux.Router, pr PipeRouter) {
func checkPipe(pr PipeRouter, end End) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["pipeID"]
_, _, ok := pr.Get(ctx, id, end)
if !ok {
_, _, err := pr.Get(ctx, id, end)
if err != nil {
w.WriteHeader(http.StatusNoContent)
return
}
Expand All @@ -44,8 +44,9 @@ func checkPipe(pr PipeRouter, end End) CtxHandlerFunc {
func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["pipeID"]
pipe, endIO, ok := pr.Get(ctx, id, end)
if !ok {
pipe, endIO, err := pr.Get(ctx, id, end)
if err != nil {
log.Errorf("Error getting pipe %s: %v", id, err)
http.NotFound(w, r)
return
}
Expand All @@ -69,6 +70,8 @@ func deletePipe(pr PipeRouter) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
log.Infof("Closing pipe %s", pipeID)
pr.Delete(ctx, pipeID)
if err := pr.Delete(ctx, pipeID); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
}
}
}
6 changes: 3 additions & 3 deletions app/pipes_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestPipeTimeout(t *testing.T) {
// create a new pipe.
id := "foo"
ctx := context.Background()
pipe, _, ok := pr.Get(ctx, id, UIEnd)
if !ok {
t.Fatalf("not ok")
pipe, _, err := pr.Get(ctx, id, UIEnd)
if err != nil {
t.Fatalf("not ok: %v", err)
}

// move time forward such that the new pipe should timeout
Expand Down
6 changes: 5 additions & 1 deletion app/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ func TestReportPostHandler(t *testing.T) {
}

ctx := context.Background()
if want, have := fixture.Report.Endpoint.Nodes, c.Report(ctx).Endpoint.Nodes; len(have) == 0 || len(want) != len(have) {
report, err := c.Report(ctx)
if err != nil {
t.Error(err)
}
if want, have := fixture.Report.Endpoint.Nodes, report.Endpoint.Nodes; len(have) == 0 || len(want) != len(have) {
t.Fatalf("Content-Type %s: %v", contentType, test.Diff(have, want))
}
}
Expand Down