Skip to content

Commit

Permalink
Probe re-org
Browse files Browse the repository at this point in the history
- tag/weave* -> package overlay
- tag/origin_host* -> package host
- tag/topology* -> package main
  • Loading branch information
peterbourgon committed Jun 30, 2015
1 parent 6978549 commit f4b3930
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 178 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*

$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go

$(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/tag/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/overlay/*.go report/*.go xfer/*.go

$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)
Expand Down
5 changes: 2 additions & 3 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ func TestContainer(t *testing.T) {
// Send some stats to the docker container
stats := &client.Stats{}
stats.MemoryStats.Usage = 12345
err = json.NewEncoder(writer).Encode(&stats)
if err != nil {
t.Errorf("%v", err)
if err = json.NewEncoder(writer).Encode(&stats); err != nil {
t.Error(err)
}
runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats

Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func applyNAT(rpt report.Report, scope string) error {
func conntrackModulePresent() bool {
f, err := os.Open(modules)
if err != nil {
log.Printf("conntrack error: %v", err)
//log.Printf("conntrack error: %v", err) // /proc doesn't exist on Darwin
return false
}
defer f.Close()
Expand Down
62 changes: 32 additions & 30 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

type reporter struct {
// Reporter generates Reports containing the Endpoint topology.
type Reporter struct {
hostID string
hostName string
includeProcesses bool
Expand All @@ -33,86 +33,88 @@ var SpyDuration = prometheus.NewSummaryVec(

// NewReporter creates a new Reporter that invokes procspy.Connections to
// generate a report.Report that contains every discovered (spied) connection
// on the host machine, at the granularity of host and port. It optionally
// enriches that topology with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) tag.Reporter {
return &reporter{
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
}
}

func (rep *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
SpyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
}(time.Now())

r := report.MakeReport()
conns, err := procspy.Connections(rep.includeProcesses)
rpt := report.MakeReport()
conns, err := procspy.Connections(r.includeProcesses)
if err != nil {
return r, err
return rpt, err
}

for conn := conns.Next(); conn != nil; conn = conns.Next() {
rep.addConnection(&r, conn)
r.addConnection(&rpt, conn)
}

if rep.includeNAT {
err = applyNAT(r, rep.hostID)
if r.includeNAT {
err = applyNAT(rpt, r.hostID)
}

return r, err
return rpt, err
}

func (rep *reporter) addConnection(r *report.Report, c *procspy.Connection) {
func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
scopedLocal = report.MakeAddressNodeID(rep.hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(rep.hostID, c.RemoteAddress.String())
scopedLocal = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String())
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)

r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote)
rpt.Address.Adjacency[key] = rpt.Address.Adjacency[key].Add(scopedRemote)

if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok {
r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": rep.hostName,
if _, ok := rpt.Address.NodeMetadatas[scopedLocal]; !ok {
rpt.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": r.hostName,
"addr": c.LocalAddress.String(),
}
}

// Count the TCP connection.
edgeMeta := r.Address.EdgeMetadatas[edgeKey]
edgeMeta := rpt.Address.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Address.EdgeMetadatas[edgeKey] = edgeMeta
rpt.Address.EdgeMetadatas[edgeKey] = edgeMeta

if c.Proc.PID > 0 {
var (
scopedLocal = report.MakeEndpointNodeID(rep.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(rep.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
scopedLocal = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)

r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote)
rpt.Endpoint.Adjacency[key] = rpt.Endpoint.Adjacency[key].Add(scopedRemote)

if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok {
if _, ok := rpt.Endpoint.NodeMetadatas[scopedLocal]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.NodeMetadata{
"addr": c.LocalAddress.String(),
"port": strconv.Itoa(int(c.LocalPort)),
"pid": fmt.Sprintf("%d", c.Proc.PID),
}

r.Endpoint.NodeMetadatas[scopedLocal] = md
rpt.Endpoint.NodeMetadatas[scopedLocal] = md
}
// Count the TCP connection.
edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta := rpt.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
rpt.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
}
}
11 changes: 6 additions & 5 deletions probe/host/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

Expand All @@ -33,21 +32,23 @@ var (
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
)

type reporter struct {
// Reporter generates Reports containing the host topology.
type Reporter struct {
hostID string
hostName string
}

// NewReporter returns a Reporter which produces a report containing host
// topology for this host.
func NewReporter(hostID, hostName string) tag.Reporter {
return &reporter{
func NewReporter(hostID, hostName string) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
}
}

func (r *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
var (
rep = report.MakeReport()
localCIDRs []string
Expand Down
27 changes: 27 additions & 0 deletions probe/host/tagger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package host

import (
"github.com/weaveworks/scope/report"
)

// Tagger tags each node in each topology of a report with the origin host
// node ID of this (probe) host. Effectively, a foreign key linking every node
// in every topology to an origin host node in the host topology.
type Tagger struct{ hostNodeID string }

// NewTagger tags each node with a foreign key linking it to its origin host
// in the host topology.
func NewTagger(hostID string) Tagger {
return Tagger{hostNodeID: report.MakeHostNodeID(hostID)}
}

// Tag implements Tagger.
func (t Tagger) Tag(r report.Report) (report.Report, error) {
md := report.NodeMetadata{report.HostNodeID: t.hostNodeID}
for _, topology := range r.Topologies() {
for nodeID := range topology.NodeMetadatas {
topology.NodeMetadatas[nodeID].Merge(md)
}
}
return r, nil
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package tag_test
package host_test

import (
"reflect"
"testing"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)

func TestOriginHostTagger(t *testing.T) {
func TestTagger(t *testing.T) {
var (
hostID = "foo"
endpointNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "56789") // hostID ignored
Expand All @@ -18,9 +19,9 @@ func TestOriginHostTagger(t *testing.T) {
r := report.MakeReport()
r.Endpoint.NodeMetadatas[endpointNodeID] = nodeMetadata
want := nodeMetadata.Merge(report.NodeMetadata{report.HostNodeID: report.MakeHostNodeID(hostID)})
rpt, _ := tag.NewOriginHostTagger(hostID).Tag(r)
rpt, _ := host.NewTagger(hostID).Tag(r)
have := rpt.Endpoint.NodeMetadatas[endpointNodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Errorf("\nwant %+v\nhave %+v", want, have)
t.Error(test.Diff(want, have))
}
}
16 changes: 8 additions & 8 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
Expand Down Expand Up @@ -80,16 +80,16 @@ func main() {
)

var (
weaveTagger *tag.WeaveTagger
weaveTagger *overlay.WeaveTagger
processCache *process.CachingWalker
)

taggers := []tag.Tagger{
tag.NewTopologyTagger(),
tag.NewOriginHostTagger(hostID),
taggers := []Tagger{
newTopologyTagger(),
host.NewTagger(hostID),
}

reporters := []tag.Reporter{
reporters := []Reporter{
host.NewReporter(hostID, hostName),
endpoint.NewReporter(hostID, hostName, *spyProcs),
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func main() {

if *weaveRouterAddr != "" {
var err error
weaveTagger, err = tag.NewWeaveTagger(*weaveRouterAddr)
weaveTagger, err = overlay.NewWeaveTagger(*weaveRouterAddr)
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func main() {
r.Overlay.Merge(weaveTagger.OverlayTopology())
}

r = tag.Apply(r, taggers)
r = Apply(r, taggers)

case <-quit:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tag
package overlay

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tag_test
package overlay_test

import (
"encoding/json"
Expand All @@ -7,15 +7,15 @@ import (
"reflect"
"testing"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/report"
)

func TestWeaveTaggerOverlayTopology(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))
defer s.Close()

w, err := tag.NewWeaveTagger(s.URL)
w, err := overlay.NewWeaveTagger(s.URL)
if err != nil {
t.Fatal(err)
}
Expand All @@ -25,8 +25,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeOverlayNodeID(mockWeavePeerName): {
tag.WeavePeerName: mockWeavePeerName,
tag.WeavePeerNickName: mockWeavePeerNickName,
overlay.WeavePeerName: mockWeavePeerName,
overlay.WeavePeerNickName: mockWeavePeerNickName,
},
},
}), w.OverlayTopology(); !reflect.DeepEqual(want, have) {
Expand Down
17 changes: 8 additions & 9 deletions probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package process
import (
"strconv"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

Expand All @@ -16,22 +15,22 @@ const (
Threads = "threads"
)

// Reporter generate Reports containing the Process topology
type reporter struct {
// Reporter generates Reports containing the Process topology.
type Reporter struct {
scope string
walker Walker
}

// NewReporter makes a new Reporter
func NewReporter(walker Walker, scope string) tag.Reporter {
return &reporter{
// NewReporter makes a new Reporter.
func NewReporter(walker Walker, scope string) *Reporter {
return &Reporter{
scope: scope,
walker: walker,
}
}

// Report generates a Report containing the Process topology
func (r *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
result := report.MakeReport()
processes, err := r.processTopology()
if err != nil {
Expand All @@ -41,7 +40,7 @@ func (r *reporter) Report() (report.Report, error) {
return result, nil
}

func (r *reporter) processTopology() (report.Topology, error) {
func (r *Reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := r.walker.Walk(func(p *Process) {
pidstr := strconv.Itoa(p.PID)
Expand Down
Loading

0 comments on commit f4b3930

Please sign in to comment.