Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter by Kubernetes Namespaces #1386

Merged
merged 6 commits into from
May 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 97 additions & 53 deletions app/api_topologies.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package app

import (
"fmt"
"net/http"
"net/url"
"sort"
"sync"

"github.com/gorilla/mux"
"golang.org/x/net/context"

"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/report"
)
Expand All @@ -21,38 +24,14 @@ var (
)

func init() {
serviceFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System services", render.IsSystem},
{"application", "Application services", render.IsApplication},
{"both", "Both", render.Noop},
},
},
}

podFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System pods", render.IsSystem},
{"application", "Application pods", render.IsApplication},
{"both", "Both", render.Noop},
},
},
}

containerFilters := []APITopologyOptionGroup{
{
ID: "system",
Default: "application",
Options: []APITopologyOption{
{"system", "System containers", render.IsSystem},
{"application", "Application containers", render.IsApplication},
{"both", "Both", render.Noop},
{"both", "Both", nil},
},
},
{
Expand All @@ -61,7 +40,7 @@ func init() {
Options: []APITopologyOption{
{"stopped", "Stopped containers", render.IsStopped},
{"running", "Running containers", render.IsRunning},
{"both", "Both", render.Noop},
{"both", "Both", nil},
},
},
}
Expand Down Expand Up @@ -116,31 +95,76 @@ func init() {
Name: "by DNS name",
Options: containerFilters,
},
APITopologyDesc{
id: "hosts",
renderer: render.HostRenderer,
Name: "Hosts",
Rank: 4,
},
APITopologyDesc{
id: "pods",
renderer: render.PodRenderer,
Name: "Pods",
Rank: 3,
HideIfEmpty: true,
Options: podFilters,
},
APITopologyDesc{
id: "pods-by-service",
parent: "pods",
renderer: render.PodServiceRenderer,
Name: "by service",
HideIfEmpty: true,
Options: serviceFilters,
},
APITopologyDesc{
id: "hosts",
renderer: render.HostRenderer,
Name: "Hosts",
Rank: 4,
},
)
}

// kubernetesFilters generates the current kubernetes filters based on the
// available k8s topologies.
func kubernetesFilters(namespaces ...string) APITopologyOptionGroup {
options := APITopologyOptionGroup{ID: "namespace", Default: "all"}
for _, namespace := range namespaces {
options.Options = append(options.Options, APITopologyOption{namespace, namespace, render.IsNamespace(namespace)})
}
options.Options = append(options.Options, APITopologyOption{"all", "All Namespaces", nil})
return options
}

// updateFilters updates the available filters based on the current report.
// Currently only kubernetes changes.
func updateFilters(rpt report.Report, topologies []APITopologyDesc) []APITopologyDesc {
namespaces := map[string]struct{}{}
for _, t := range []report.Topology{rpt.Pod, rpt.Service} {
for _, n := range t.Nodes {
if state, ok := n.Latest.Lookup(kubernetes.PodState); ok && state == kubernetes.StateDeleted {
continue
}
if namespace, ok := n.Latest.Lookup(kubernetes.Namespace); ok {
namespaces[namespace] = struct{}{}
}
}
}
var ns []string
for namespace := range namespaces {
ns = append(ns, namespace)
}
sort.Strings(ns)
for i, t := range topologies {
if t.id == "pods" || t.id == "pods-by-service" {
topologies[i] = updateTopologyFilters(t, []APITopologyOptionGroup{kubernetesFilters(ns...)})
}
}
return topologies
}

// updateTopologyFilters recursively sets the options on a topology description
func updateTopologyFilters(t APITopologyDesc, options []APITopologyOptionGroup) APITopologyDesc {
t.Options = options
for i, sub := range t.SubTopologies {
t.SubTopologies[i] = updateTopologyFilters(sub, options)
}
return t
}

// registry is a threadsafe store of the available topologies
type registry struct {
sync.RWMutex
Expand Down Expand Up @@ -239,23 +263,23 @@ func (r *registry) makeTopologyList(rep Reporter) CtxHandlerFunc {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
topologies := r.renderTopologies(report, req)
respondWith(w, http.StatusOK, topologies)
respondWith(w, http.StatusOK, r.renderTopologies(report, req))
}
}

func (r *registry) renderTopologies(rpt report.Report, req *http.Request) []APITopologyDesc {
topologies := []APITopologyDesc{}
req.ParseForm()
r.walk(func(desc APITopologyDesc) {
renderer, decorator := renderedForRequest(req, desc)
renderer, decorator, _ := r.rendererForTopology(desc.id, req.Form, rpt)
desc.Stats = decorateWithStats(rpt, renderer, decorator)
for i := range desc.SubTopologies {
renderer, decorator := renderedForRequest(req, desc.SubTopologies[i])
for i, sub := range desc.SubTopologies {
renderer, decorator, _ := r.rendererForTopology(sub.id, req.Form, rpt)
desc.SubTopologies[i].Stats = decorateWithStats(rpt, renderer, decorator)
}
topologies = append(topologies, desc)
})
return topologies
return updateFilters(rpt, topologies)
}

func decorateWithStats(rpt report.Report, renderer render.Renderer, decorator render.Decorator) topologyStats {
Expand All @@ -280,10 +304,16 @@ func decorateWithStats(rpt report.Report, renderer render.Renderer, decorator re
}
}

func renderedForRequest(r *http.Request, topology APITopologyDesc) (render.Renderer, render.Decorator) {
func (r *registry) rendererForTopology(topologyID string, values url.Values, rpt report.Report) (render.Renderer, render.Decorator, error) {
topology, ok := r.get(topologyID)
if !ok {
return nil, nil, fmt.Errorf("topology not found: %s", topologyID)
}
topology = updateFilters(rpt, []APITopologyDesc{topology})[0]

var filters []render.FilterFunc
for _, group := range topology.Options {
value := r.FormValue(group.ID)
value := values.Get(group.ID)
for _, opt := range group.Options {
if opt.filter == nil {
continue
Expand All @@ -299,23 +329,37 @@ func renderedForRequest(r *http.Request, topology APITopologyDesc) (render.Rende
return render.MakeFilter(render.ComposeFilterFuncs(filters...), renderer)
}
}
return topology.renderer, decorator
return topology.renderer, decorator, nil
}

type reportRenderHandler func(
context.Context,
Reporter, render.Renderer, render.Decorator,
http.ResponseWriter, *http.Request,
)
type reporterHandler func(context.Context, Reporter, http.ResponseWriter, *http.Request)

func captureReporter(rep Reporter, f reporterHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
f(ctx, rep, w, r)
}
}

func (r *registry) captureRenderer(rep Reporter, f reportRenderHandler) CtxHandlerFunc {
type rendererHandler func(context.Context, render.Renderer, render.Decorator, report.Report, http.ResponseWriter, *http.Request)

func (r *registry) captureRenderer(rep Reporter, f rendererHandler) CtxHandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
topology, ok := r.get(mux.Vars(req)["topology"])
if !ok {
topologyID := mux.Vars(req)["topology"]
if _, ok := r.get(topologyID); !ok {
http.NotFound(w, req)
return
}
renderer, decorator := renderedForRequest(req, topology)
f(ctx, rep, renderer, decorator, w, req)
rpt, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
req.ParseForm()
renderer, decorator, err := r.rendererForTopology(topologyID, req.Form, rpt)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
f(ctx, renderer, decorator, rpt, w, req)
}
}
79 changes: 29 additions & 50 deletions app/api_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/render"
"github.com/weaveworks/scope/render/detailed"
"github.com/weaveworks/scope/report"
)

const (
Expand All @@ -28,76 +29,48 @@ type APINode struct {
}

// Full topology.
func handleTopology(
ctx context.Context,
rep Reporter, renderer render.Renderer, decorator render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
report, err := rep.Report(ctx)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
func handleTopology(ctx context.Context, renderer render.Renderer, decorator render.Decorator, report report.Report, w http.ResponseWriter, r *http.Request) {
respondWith(w, http.StatusOK, APITopology{
Nodes: detailed.Summaries(report, renderer.Render(report, decorator)),
})
}

// Websocket for the full topology. This route overlaps with the next.
func handleWs(
ctx context.Context,
rep Reporter, renderer render.Renderer, decorator render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
return
}
}
handleWebsocket(ctx, w, r, rep, renderer, decorator, loop)
}

// Individual nodes.
func handleNode(
ctx context.Context,
rep Reporter, renderer render.Renderer, _ render.Decorator,
w http.ResponseWriter, r *http.Request,
) {
func handleNode(ctx context.Context, renderer render.Renderer, _ render.Decorator, report report.Report, w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
topologyID = vars["topology"]
nodeID = vars["id"]
report, err = rep.Report(ctx)
rendered = renderer.Render(report, render.FilterNoop)
node, ok = rendered[nodeID]
vars = mux.Vars(r)
topologyID = vars["topology"]
nodeID = vars["id"]
rendered = renderer.Render(report, nil)
node, ok = rendered[nodeID]
)
if err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
if !ok {
http.NotFound(w, r)
return
}
respondWith(w, http.StatusOK, APINode{Node: detailed.MakeNode(topologyID, report, rendered, node)})
}

// Websocket for the full topology.
func handleWebsocket(
ctx context.Context,
rep Reporter,
w http.ResponseWriter,
r *http.Request,
rep Reporter,
renderer render.Renderer,
decorator render.Decorator,
loop time.Duration,
) {

This comment was marked as abuse.

if err := r.ParseForm(); err != nil {
respondWith(w, http.StatusInternalServerError, err.Error())
return
}
loop := websocketLoop
if t := r.Form.Get("t"); t != "" {
var err error
if loop, err = time.ParseDuration(t); err != nil {
respondWith(w, http.StatusBadRequest, t)
return
}
}

conn, err := xfer.Upgrade(w, r, nil)
if err != nil {
// log.Info("Upgrade:", err)
Expand All @@ -122,6 +95,7 @@ func handleWebsocket(
previousTopo detailed.NodeSummaries
tick = time.Tick(loop)
wait = make(chan struct{}, 1)
topologyID = mux.Vars(r)["topology"]
)
rep.WaitOn(ctx, wait)
defer rep.UnWait(ctx, wait)
Expand All @@ -132,6 +106,11 @@ func handleWebsocket(
log.Errorf("Error generating report: %v", err)
return
}
renderer, decorator, err := topologyRegistry.rendererForTopology(topologyID, r.Form, report)
if err != nil {
log.Errorf("Error generating report: %v", err)
return
}
newTopo := detailed.Summaries(report, renderer.Render(report, decorator))
diff := detailed.TopoDiff(previousTopo, newTopo)
previousTopo = newTopo
Expand Down
2 changes: 1 addition & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) {
get.HandleFunc("/api/topology/{topology}",
gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(r, handleTopology))))
get.HandleFunc("/api/topology/{topology}/ws",
requestContextDecorator(topologyRegistry.captureRenderer(r, handleWs))) // NB not gzip!
requestContextDecorator(captureReporter(r, handleWebsocket))) // NB not gzip!
get.MatcherFunc(URLMatcher("/api/topology/{topology}/{id}")).HandlerFunc(
gzipHandler(requestContextDecorator(topologyRegistry.captureRenderer(r, handleNode))))
get.HandleFunc("/api/report",
Expand Down
Loading