Skip to content

Commit

Permalink
Merge pull request #1386 from weaveworks/1340-k8s-filter-namespace
Browse files Browse the repository at this point in the history
Filter by Kubernetes Namespaces
  • Loading branch information
paulbellamy committed May 3, 2016
2 parents 0456df9 + 0e70f70 commit 57bc6f7
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 176 deletions.
150 changes: 97 additions & 53 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package app

import (
"fmt"
"net/http"
"net/url"
"sort"
"sync"

"github.com/gorilla/mux"
"golang.org/x/net/context"

"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
)
Expand All @@ -21,38 +24,14 @@ var (
)

func init() {
serviceFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System services", render.IsSystem},
{"application", "Application services", render.IsApplication},
{"both", "Both", render.Noop},
},
},
}

podFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System pods", render.IsSystem},
{"application", "Application pods", render.IsApplication},
{"both", "Both", render.Noop},
},
},
}

containerFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System containers", render.IsSystem},
{"application", "Application containers", render.IsApplication},
{"both", "Both", render.Noop},
{"both", "Both", nil},
},
},
{
Expand All @@ -61,7 +40,7 @@ func init() {
Options: []APITopologyOption{
{"stopped", "Stopped containers", render.IsStopped},
{"running", "Running containers", render.IsRunning},
{"both", "Both", render.Noop},
{"both", "Both", nil},
},
},
}
Expand Down Expand Up @@ -116,31 +95,76 @@ func init() {
Name: "by DNS name",
Options: containerFilters,
},
APITopologyDesc{
id: "hosts",
renderer: render.HostRenderer,
Name: "Hosts",
Rank: 4,
},
APITopologyDesc{
id: "pods",
renderer: render.PodRenderer,
Name: "Pods",
Rank: 3,
HideIfEmpty: true,
Options: podFilters,
},
APITopologyDesc{
id: "pods-by-service",
parent: "pods",
renderer: render.PodServiceRenderer,
Name: "by service",
HideIfEmpty: true,
Options: serviceFilters,
},
APITopologyDesc{
id: "hosts",
renderer: render.HostRenderer,
Name: "Hosts",
Rank: 4,
},
)
}

// kubernetesFilters generates the current kubernetes filters based on the
// available k8s topologies.
func kubernetesFilters(namespaces ...string) APITopologyOptionGroup {
options := APITopologyOptionGroup{ID: "namespace", Default: "all"}
for _, namespace := range namespaces {
options.Options = append(options.Options, APITopologyOption{namespace, namespace, render.IsNamespace(namespace)})
}
options.Options = append(options.Options, APITopologyOption{"all", "All Namespaces", nil})
return options
}

// updateFilters updates the available filters based on the current report.
// Currently only kubernetes changes.
func updateFilters(rpt report.Report, topologies []APITopologyDesc) []APITopologyDesc {
namespaces := map[string]struct{}{}
for _, t := range []report.Topology{rpt.Pod, rpt.Service} {
for _, n := range t.Nodes {
if state, ok := n.Latest.Lookup(kubernetes.PodState); ok && state == kubernetes.StateDeleted {
continue
}
if namespace, ok := n.Latest.Lookup(kubernetes.Namespace); ok {
namespaces[namespace] = struct{}{}
}
}
}
var ns []string
for namespace := range namespaces {
ns = append(ns, namespace)
}
sort.Strings(ns)
for i, t := range topologies {
if t.id == "pods" || t.id == "pods-by-service" {
topologies[i] = updateTopologyFilters(t, []APITopologyOptionGroup{kubernetesFilters(ns...)})
}
}
return topologies
}

// updateTopologyFilters recursively sets the options on a topology description
func updateTopologyFilters(t APITopologyDesc, options []APITopologyOptionGroup) APITopologyDesc {
t.Options = options
for i, sub := range t.SubTopologies {
t.SubTopologies[i] = updateTopologyFilters(sub, options)
}
return t
}

// registry is a threadsafe store of the available topologies
type registry struct {
sync.RWMutex
Expand Down Expand Up @@ -239,23 +263,23 @@ func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
topologies := r.renderTopologies(report, req)
respondWith(w, http.StatusOK, topologies)
respondWith(w, http.StatusOK, r.renderTopologies(report, req))
}
}

func (r *registry) renderTopologies(rpt report.Report, req *http.Request) []APITopologyDesc {
topologies := []APITopologyDesc{}
req.ParseForm()
r.walk(func(desc APITopologyDesc) {
renderer, decorator := renderedForRequest(req, desc)
renderer, decorator, _ := r.rendererForTopology(desc.id, req.Form, rpt)
desc.Stats = decorateWithStats(rpt, renderer, decorator)
for i := range desc.SubTopologies {
renderer, decorator := renderedForRequest(req, desc.SubTopologies[i])
for i, sub := range desc.SubTopologies {
renderer, decorator, _ := r.rendererForTopology(sub.id, req.Form, rpt)
desc.SubTopologies[i].Stats = decorateWithStats(rpt, renderer, decorator)
}
topologies = append(topologies, desc)
})
return topologies
return updateFilters(rpt, topologies)
}

func decorateWithStats(rpt report.Report, renderer render.Renderer, decorator render.Decorator) topologyStats {
Expand All @@ -280,10 +304,16 @@ func decorateWithStats(rpt report.Report, renderer render.Renderer, decorator re
}
}

func renderedForRequest(r *http.Request, topology APITopologyDesc) (render.Renderer, render.Decorator) {
func (r *registry) rendererForTopology(topologyID string, values url.Values, rpt report.Report) (render.Renderer, render.Decorator, error) {
topology, ok := r.get(topologyID)
if !ok {
return nil, nil, fmt.Errorf("topology not found: %s", topologyID)
}
topology = updateFilters(rpt, []APITopologyDesc{topology})[0]

var filters []render.FilterFunc
for _, group := range topology.Options {
value := r.FormValue(group.ID)
value := values.Get(group.ID)
for _, opt := range group.Options {
if opt.filter == nil {
continue
Expand All @@ -299,23 +329,37 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) (render.Rende
return render.MakeFilter(render.ComposeFilterFuncs(filters...), renderer)
}
}
return topology.renderer, decorator
return topology.renderer, decorator, nil
}

type reportRenderHandler func(
context.Context,
Reporter, render.Renderer, render.Decorator,
http.ResponseWriter, *http.Request,
)
type reporterHandler func(context.Context, Reporter, http.ResponseWriter, *http.Request)

func captureReporter(rep Reporter, f reporterHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
f(ctx, rep, w, r)
}
}

func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc {
type rendererHandler func(context.Context, render.Renderer, render.Decorator, report.Report, http.ResponseWriter, *http.Request)

func (r *registry) captureRenderer(rep Reporter, f rendererHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(mux.Vars(req)["topology"])
if !ok {
topologyID := mux.Vars(req)["topology"]
if _, ok := r.get(topologyID); !ok {
http.NotFound(w, req)
return
}
renderer, decorator := renderedForRequest(req, topology)
f(ctx, rep, renderer, decorator, w, req)
rpt, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
req.ParseForm()
renderer, decorator, err := r.rendererForTopology(topologyID, req.Form, rpt)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
f(ctx, renderer, decorator, rpt, w, req)
}
}
79 changes: 29 additions & 50 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/render/detailed"
"github.com/weaveworks/scope/report"
)

const (
Expand All @@ -28,76 +29,48 @@ type APINode struct {
}

// Full topology.
func handleTopology(
ctx context.Context,
rep Reporter, renderer render.Renderer, decorator render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
report, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
func handleTopology(ctx context.Context, renderer render.Renderer, decorator render.Decorator, report report.Report, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: detailed.Summaries(report, renderer.Render(report, decorator)),
})
}

// Websocket for the full topology. This route overlaps with the next.
func handleWs(
ctx context.Context,
rep Reporter, renderer render.Renderer, decorator render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
return
}
}
handleWebsocket(ctx, w, r, rep, renderer, decorator, loop)
}

// Individual nodes.
func handleNode(
ctx context.Context,
rep Reporter, renderer render.Renderer, _ render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
func handleNode(ctx context.Context, renderer render.Renderer, _ render.Decorator, report report.Report, w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
topologyID = vars["topology"]
nodeID = vars["id"]
report, err = rep.Report(ctx)
rendered = renderer.Render(report, render.FilterNoop)
node, ok = rendered[nodeID]
vars = mux.Vars(r)
topologyID = vars["topology"]
nodeID = vars["id"]
rendered = renderer.Render(report, nil)
node, ok = rendered[nodeID]
)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
if !ok {
http.NotFound(w, r)
return
}
respondWith(w, http.StatusOK, APINode{Node: detailed.MakeNode(topologyID, report, rendered, node)})
}

// Websocket for the full topology.
func handleWebsocket(
ctx context.Context,
rep Reporter,
w http.ResponseWriter,
r *http.Request,
rep Reporter,
renderer render.Renderer,
decorator render.Decorator,
loop time.Duration,
) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
return
}
}

conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
// log.Info("Upgrade:", err)
Expand All @@ -122,6 +95,7 @@ func handleWebsocket(
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)
Expand All @@ -132,6 +106,11 @@ func handleWebsocket(
log.Errorf("Error generating report: %v", err)
return
}
renderer, decorator, err := topologyRegistry.rendererForTopology(topologyID, r.Form, report)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
newTopo := detailed.Summaries(report, renderer.Render(report, decorator))
diff := detailed.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo
Expand Down
2 changes: 1 addition & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) {
get.HandleFunc("/api/topology/{topology}",
gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(r, handleTopology))))
get.HandleFunc("/api/topology/{topology}/ws",
requestContextDecorator(topologyRegistry.captureRenderer(r, handleWs))) // NB not gzip!
requestContextDecorator(captureReporter(r, handleWebsocket))) // NB not gzip!
get.MatcherFunc(URLMatcher("/api/topology/{topology}/{id}")).HandlerFunc(
gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(r, handleNode))))
get.HandleFunc("/api/report",
Expand Down
Loading

0 comments on commit 57bc6f7

Please sign in to comment.