Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to specify conntrack buffer size. #1896

Merged
merged 3 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -81,12 +82,13 @@ type conntrackWalker struct {
cmd exec.Cmd
activeFlows map[int64]flow // active flows in state != TIME_WAIT
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
bufferSize int
args []string
quit chan struct{}
}

// newConntracker creates and starts a new conntracker.
func newConntrackFlowWalker(useConntrack bool, procRoot string, args ...string) flowWalker {
func newConntrackFlowWalker(useConntrack bool, procRoot string, bufferSize int, args ...string) flowWalker {
if !useConntrack {
return nilFlowWalker{}
} else if err := IsConntrackSupported(procRoot); err != nil {
Expand All @@ -95,6 +97,7 @@ func newConntrackFlowWalker(useConntrack bool, procRoot string, args ...string)
}
result := &conntrackWalker{
activeFlows: map[int64]flow{},
bufferSize: bufferSize,
args: args,
quit: make(chan struct{}),
}
Expand Down Expand Up @@ -160,7 +163,10 @@ func (c *conntrackWalker) run() {
c.handleFlow(flow, true)
}

args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...)
args := append([]string{
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
"-o", "xml", "-p", "tcp"}, c.args...,
)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion probe/endpoint/conntrack_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const conntrackCloseTag = "</conntrack>\n"
const bufferSize = 1024 * 1024

This comment was marked as abuse.

This comment was marked as abuse.


func makeFlow(ty string) flow {
return flow{
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestConntracker(t *testing.T) {
return testexec.NewMockCmd(reader)
}

flowWalker := newConntrackFlowWalker(true, "")
flowWalker := newConntrackFlowWalker(true, "", bufferSize)
defer flowWalker.stop()

// First write out some empty xml for the existing connections
Expand Down
47 changes: 25 additions & 22 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@ const (
SnoopedDNSNames = "snooped_dns_names"
)

// ReporterConfig are the config options for the endpoint reporter.
type ReporterConfig struct {
HostID string
HostName string
SpyProcs bool
UseConntrack bool
WalkProc bool
ProcRoot string
BufferSize int
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}

// Reporter generates Reports containing the Endpoint topology.
type Reporter struct {
hostID string
hostName string
spyProcs bool
walkProc bool
conf ReporterConfig
flowWalker flowWalker // interface
scanner procspy.ConnectionScanner
natMapper natMapper
reverseResolver *reverseResolver
dnsSnooper *DNSSnooper
}

// SpyDuration is an exported prometheus metric
Expand All @@ -54,17 +62,12 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool, procRoot string, scanner procspy.ConnectionScanner, dnsSnooper *DNSSnooper) *Reporter {
func NewReporter(conf ReporterConfig) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
spyProcs: spyProcs,
walkProc: walkProc,
flowWalker: newConntrackFlowWalker(useConntrack, procRoot),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, procRoot, "--any-nat")),
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize),
natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: scanner,
dnsSnooper: dnsSnooper,
}
}

Expand All @@ -76,7 +79,7 @@ func (r *Reporter) Stop() {
r.flowWalker.stop()
r.natMapper.stop()
r.reverseResolver.stop()
r.scanner.Stop()
r.conf.Scanner.Stop()
}

type fourTuple struct {
Expand Down Expand Up @@ -106,7 +109,7 @@ func (r *Reporter) Report() (report.Report, error) {
SpyDuration.WithLabelValues().Observe(time.Since(begin).Seconds())
}(time.Now())

hostNodeID := report.MakeHostNodeID(r.hostID)
hostNodeID := report.MakeHostNodeID(r.conf.HostID)
rpt := report.MakeReport()
seenTuples := map[string]fourTuple{}

Expand Down Expand Up @@ -139,8 +142,8 @@ func (r *Reporter) Report() (report.Report, error) {
})
}

if r.walkProc {
conns, err := r.scanner.Connections(r.spyProcs)
if r.conf.WalkProc {
conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs)
if err != nil {
return rpt, err
}
Expand Down Expand Up @@ -178,7 +181,7 @@ func (r *Reporter) Report() (report.Report, error) {
}
}

r.natMapper.applyNAT(rpt, r.hostID)
r.natMapper.applyNAT(rpt, r.conf.HostID)
return rpt, nil
}

Expand All @@ -194,9 +197,9 @@ func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, namespaceID st
func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node {
portStr := strconv.Itoa(int(port))
node := report.MakeNodeWith(
report.MakeEndpointNodeID(r.hostID, namespaceID, addr, portStr),
report.MakeEndpointNodeID(r.conf.HostID, namespaceID, addr, portStr),
map[string]string{Addr: addr, Port: portStr})
if names := r.dnsSnooper.CachedNamesForIP(addr); len(names) > 0 {
if names := r.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 {
node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...))
}
if names, err := r.reverseResolver.get(addr); err == nil && len(names) > 0 {
Expand Down
18 changes: 16 additions & 2 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,21 @@ var (
}
)

const bufferSize = 1024 * 1024

func TestSpyNoProcesses(t *testing.T) {
const (
nodeID = "heinz-tomato-ketchup" // TODO rename to hostID
nodeName = "frenchs-since-1904" // TODO rename to hostName
)

scanner := procspy.FixedScanner(fixConnections)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, false, "", scanner, nil)
reporter := endpoint.NewReporter(endpoint.ReporterConfig{
HostID: nodeID,
HostName: nodeName,
BufferSize: bufferSize,
Scanner: scanner,
})
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
Expand All @@ -86,7 +93,14 @@ func TestSpyWithProcesses(t *testing.T) {
)

scanner := procspy.FixedScanner(fixConnectionsWithProcesses)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, true, "", scanner, nil)
reporter := endpoint.NewReporter(endpoint.ReporterConfig{
HostID: nodeID,
HostName: nodeName,
SpyProcs: true,
WalkProc: true,
BufferSize: bufferSize,
Scanner: scanner,
})
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

Expand Down
11 changes: 7 additions & 4 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ type probeFlags struct {
noApp bool
noControls bool

useConntrack bool // Use conntrack for endpoint topo
spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
procRoot string
useConntrack bool // Use conntrack for endpoint topo
conntrackBufferSize int // Sie of kernel buffer for conntrack

spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
procRoot string

dockerEnabled bool
dockerInterval time.Duration
Expand Down Expand Up @@ -197,6 +199,7 @@ func main() {

// Proc & endpoint
flag.BoolVar(&flags.probe.useConntrack, "probe.conntrack", true, "also use conntrack to track connections")
flag.IntVar(&flags.probe.conntrackBufferSize, "probe.conntrack.buffersize", 208*1024, "conntrack buffer size")

This comment was marked as abuse.

This comment was marked as abuse.

flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)")
flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem")
flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections")
Expand Down
12 changes: 11 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,17 @@ func probeMain(flags probeFlags) {
defer dnsSnooper.Stop()
}

endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, flags.procRoot, scanner, dnsSnooper)
endpointReporter := endpoint.NewReporter(endpoint.ReporterConfig{
HostID: hostID,
HostName: hostName,
SpyProcs: flags.spyProcs,
UseConntrack: flags.useConntrack,
WalkProc: flags.procEnabled,
ProcRoot: flags.procRoot,
BufferSize: flags.conntrackBufferSize,
Scanner: scanner,
DNSSnooper: dnsSnooper,
})
defer endpointReporter.Stop()
p.AddReporter(endpointReporter)

Expand Down