Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Probe re-org #294

Merged
merged 2 commits into from
Jun 30, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*

$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go

$(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/tag/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/docker/*.go probe/endpoint/*.go probe/host/*.go probe/process/*.go probe/overlay/*.go report/*.go xfer/*.go

$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)
Expand Down
5 changes: 2 additions & 3 deletions probe/docker/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ func TestContainer(t *testing.T) {
// Send some stats to the docker container
stats := &client.Stats{}
stats.MemoryStats.Usage = 12345
err = json.NewEncoder(writer).Encode(&stats)
if err != nil {
t.Errorf("%v", err)
if err = json.NewEncoder(writer).Encode(&stats); err != nil {
t.Error(err)
}
runtime.Gosched() // wait for StartGatheringStats goroutine to receive the stats

Expand Down
2 changes: 1 addition & 1 deletion probe/endpoint/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func applyNAT(rpt report.Report, scope string) error {
func conntrackModulePresent() bool {
f, err := os.Open(modules)
if err != nil {
log.Printf("conntrack error: %v", err)
//log.Printf("conntrack error: %v", err) // /proc doesn't exist on Darwin

This comment was marked as abuse.

return false
}
defer f.Close()
Expand Down
62 changes: 32 additions & 30 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

type reporter struct {
// Reporter generates Reports containing the Endpoint topology.
type Reporter struct {
hostID string
hostName string
includeProcesses bool
Expand All @@ -33,86 +33,88 @@ var SpyDuration = prometheus.NewSummaryVec(

// NewReporter creates a new Reporter that invokes procspy.Connections to
// generate a report.Report that contains every discovered (spied) connection
// on the host machine, at the granularity of host and port. It optionally
// enriches that topology with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) tag.Reporter {
return &reporter{
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
}
}

func (rep *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
SpyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
}(time.Now())

r := report.MakeReport()
conns, err := procspy.Connections(rep.includeProcesses)
rpt := report.MakeReport()
conns, err := procspy.Connections(r.includeProcesses)
if err != nil {
return r, err
return rpt, err
}

for conn := conns.Next(); conn != nil; conn = conns.Next() {
rep.addConnection(&r, conn)
r.addConnection(&rpt, conn)
}

if rep.includeNAT {
err = applyNAT(r, rep.hostID)
if r.includeNAT {
err = applyNAT(rpt, r.hostID)
}

return r, err
return rpt, err
}

func (rep *reporter) addConnection(r *report.Report, c *procspy.Connection) {
func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
scopedLocal = report.MakeAddressNodeID(rep.hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(rep.hostID, c.RemoteAddress.String())
scopedLocal = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String())
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)

r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote)
rpt.Address.Adjacency[key] = rpt.Address.Adjacency[key].Add(scopedRemote)

if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok {
r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": rep.hostName,
if _, ok := rpt.Address.NodeMetadatas[scopedLocal]; !ok {
rpt.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": r.hostName,
"addr": c.LocalAddress.String(),
}
}

// Count the TCP connection.
edgeMeta := r.Address.EdgeMetadatas[edgeKey]
edgeMeta := rpt.Address.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Address.EdgeMetadatas[edgeKey] = edgeMeta
rpt.Address.EdgeMetadatas[edgeKey] = edgeMeta

if c.Proc.PID > 0 {
var (
scopedLocal = report.MakeEndpointNodeID(rep.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(rep.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
scopedLocal = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)

r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote)
rpt.Endpoint.Adjacency[key] = rpt.Endpoint.Adjacency[key].Add(scopedRemote)

if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok {
if _, ok := rpt.Endpoint.NodeMetadatas[scopedLocal]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.NodeMetadata{
"addr": c.LocalAddress.String(),
"port": strconv.Itoa(int(c.LocalPort)),
"pid": fmt.Sprintf("%d", c.Proc.PID),
}

r.Endpoint.NodeMetadatas[scopedLocal] = md
rpt.Endpoint.NodeMetadatas[scopedLocal] = md
}
// Count the TCP connection.
edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta := rpt.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
rpt.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
}
}
11 changes: 6 additions & 5 deletions probe/host/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

Expand All @@ -33,21 +32,23 @@ var (
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
)

type reporter struct {
// Reporter generates Reports containing the host topology.
type Reporter struct {
hostID string
hostName string
}

// NewReporter returns a Reporter which produces a report containing host
// topology for this host.
func NewReporter(hostID, hostName string) tag.Reporter {
return &reporter{
func NewReporter(hostID, hostName string) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
}
}

func (r *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
var (
rep = report.MakeReport()
localCIDRs []string
Expand Down
27 changes: 27 additions & 0 deletions probe/host/tagger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package host

import (
"github.com/weaveworks/scope/report"
)

// Tagger tags each node in each topology of a report with the origin host
// node ID of this (probe) host. Effectively, a foreign key linking every node
// in every topology to an origin host node in the host topology.
type Tagger struct{ hostNodeID string }

// NewTagger tags each node with a foreign key linking it to its origin host
// in the host topology.
func NewTagger(hostID string) Tagger {
return Tagger{hostNodeID: report.MakeHostNodeID(hostID)}
}

// Tag implements Tagger.
func (t Tagger) Tag(r report.Report) (report.Report, error) {
md := report.NodeMetadata{report.HostNodeID: t.hostNodeID}
for _, topology := range r.Topologies() {
for nodeID := range topology.NodeMetadatas {
topology.NodeMetadatas[nodeID].Merge(md)
}
}
return r, nil
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package tag_test
package host_test

import (
"reflect"
"testing"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)

func TestOriginHostTagger(t *testing.T) {
func TestTagger(t *testing.T) {
var (
hostID = "foo"
endpointNodeID = report.MakeEndpointNodeID(hostID, "1.2.3.4", "56789") // hostID ignored
Expand All @@ -18,9 +19,9 @@ func TestOriginHostTagger(t *testing.T) {
r := report.MakeReport()
r.Endpoint.NodeMetadatas[endpointNodeID] = nodeMetadata
want := nodeMetadata.Merge(report.NodeMetadata{report.HostNodeID: report.MakeHostNodeID(hostID)})
rpt, _ := tag.NewOriginHostTagger(hostID).Tag(r)
rpt, _ := host.NewTagger(hostID).Tag(r)
have := rpt.Endpoint.NodeMetadatas[endpointNodeID].Copy()
if !reflect.DeepEqual(want, have) {
t.Errorf("\nwant %+v\nhave %+v", want, have)
t.Error(test.Diff(want, have))
}
}
16 changes: 8 additions & 8 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
Expand Down Expand Up @@ -80,16 +80,16 @@ func main() {
)

var (
weaveTagger *tag.WeaveTagger
weaveTagger *overlay.WeaveTagger

This comment was marked as abuse.

This comment was marked as abuse.

processCache *process.CachingWalker
)

taggers := []tag.Tagger{
tag.NewTopologyTagger(),
tag.NewOriginHostTagger(hostID),
taggers := []Tagger{
newTopologyTagger(),
host.NewTagger(hostID),
}

reporters := []tag.Reporter{
reporters := []Reporter{
host.NewReporter(hostID, hostName),
endpoint.NewReporter(hostID, hostName, *spyProcs),
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func main() {

if *weaveRouterAddr != "" {
var err error
weaveTagger, err = tag.NewWeaveTagger(*weaveRouterAddr)
weaveTagger, err = overlay.NewWeaveTagger(*weaveRouterAddr)
if err != nil {
log.Fatalf("failed to start Weave tagger: %v", err)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func main() {
r.Overlay.Merge(weaveTagger.OverlayTopology())
}

r = tag.Apply(r, taggers)
r = Apply(r, taggers)

case <-quit:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tag
package overlay

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tag_test
package overlay_test

import (
"encoding/json"
Expand All @@ -7,15 +7,15 @@ import (
"reflect"
"testing"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/report"
)

func TestWeaveTaggerOverlayTopology(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(mockWeaveRouter))
defer s.Close()

w, err := tag.NewWeaveTagger(s.URL)
w, err := overlay.NewWeaveTagger(s.URL)
if err != nil {
t.Fatal(err)
}
Expand All @@ -25,8 +25,8 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
EdgeMetadatas: report.EdgeMetadatas{},
NodeMetadatas: report.NodeMetadatas{
report.MakeOverlayNodeID(mockWeavePeerName): {
tag.WeavePeerName: mockWeavePeerName,
tag.WeavePeerNickName: mockWeavePeerNickName,
overlay.WeavePeerName: mockWeavePeerName,
overlay.WeavePeerNickName: mockWeavePeerNickName,
},
},
}), w.OverlayTopology(); !reflect.DeepEqual(want, have) {
Expand Down
17 changes: 8 additions & 9 deletions probe/process/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package process
import (
"strconv"

"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)

Expand All @@ -16,22 +15,22 @@ const (
Threads = "threads"
)

// Reporter generate Reports containing the Process topology
type reporter struct {
// Reporter generates Reports containing the Process topology.
type Reporter struct {
scope string
walker Walker
}

// NewReporter makes a new Reporter
func NewReporter(walker Walker, scope string) tag.Reporter {
return &reporter{
// NewReporter makes a new Reporter.
func NewReporter(walker Walker, scope string) *Reporter {
return &Reporter{
scope: scope,
walker: walker,
}
}

// Report generates a Report containing the Process topology
func (r *reporter) Report() (report.Report, error) {
// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
result := report.MakeReport()
processes, err := r.processTopology()
if err != nil {
Expand All @@ -41,7 +40,7 @@ func (r *reporter) Report() (report.Report, error) {
return result, nil
}

func (r *reporter) processTopology() (report.Topology, error) {
func (r *Reporter) processTopology() (report.Topology, error) {
t := report.NewTopology()
err := r.walker.Walk(func(p *Process) {
pidstr := strconv.Itoa(p.PID)
Expand Down
Loading