Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to specify conntrack buffer size. #1896

Merged
merged 3 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -81,12 +82,13 @@ type conntrackWalker struct {
cmd exec.Cmd
activeFlows map[int64]flow // active flows in state != TIME_WAIT
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
bufferSize int
args []string
quit chan struct{}
}

// newConntracker creates and starts a new conntracker.
func newConntrackFlowWalker(useConntrack bool, procRoot string, args ...string) flowWalker {
func newConntrackFlowWalker(useConntrack bool, procRoot string, bufferSize int, args ...string) flowWalker {
if !useConntrack {
return nilFlowWalker{}
} else if err := IsConntrackSupported(procRoot); err != nil {
Expand All @@ -95,6 +97,7 @@ func newConntrackFlowWalker(useConntrack bool, procRoot string, args ...string)
}
result := &conntrackWalker{
activeFlows: map[int64]flow{},
bufferSize: bufferSize,
args: args,
quit: make(chan struct{}),
}
Expand Down Expand Up @@ -160,7 +163,10 @@ func (c *conntrackWalker) run() {
c.handleFlow(flow, true)
}

args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...)
args := append([]string{
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
"-o", "xml", "-p", "tcp"}, c.args...,
)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion probe/endpoint/conntrack_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const conntrackCloseTag = "</conntrack>\n"
const bufferSize = 1024 * 1024

This comment was marked as abuse.

This comment was marked as abuse.


func makeFlow(ty string) flow {
return flow{
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestConntracker(t *testing.T) {
return testexec.NewMockCmd(reader)
}

flowWalker := newConntrackFlowWalker(true, "")
flowWalker := newConntrackFlowWalker(true, "", bufferSize)
defer flowWalker.stop()

// First write out some empty xml for the existing connections
Expand Down
10 changes: 7 additions & 3 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool, procRoot string, scanner procspy.ConnectionScanner, dnsSnooper *DNSSnooper) *Reporter {
func NewReporter(
hostID, hostName string,
spyProcs, useConntrack, walkProc bool,
procRoot string, bufferSize int,
scanner procspy.ConnectionScanner, dnsSnooper *DNSSnooper) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
spyProcs: spyProcs,
walkProc: walkProc,
flowWalker: newConntrackFlowWalker(useConntrack, procRoot),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, procRoot, "--any-nat")),
flowWalker: newConntrackFlowWalker(useConntrack, procRoot, bufferSize),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, procRoot, bufferSize, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: scanner,
dnsSnooper: dnsSnooper,
Expand Down
11 changes: 7 additions & 4 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ type probeFlags struct {
noApp bool
noControls bool

useConntrack bool // Use conntrack for endpoint topo
spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
procRoot string
useConntrack bool // Use conntrack for endpoint topo
conntrackBufferSize int // Sie of kernel buffer for conntrack

spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
procRoot string

dockerEnabled bool
dockerInterval time.Duration
Expand Down Expand Up @@ -197,6 +199,7 @@ func main() {

// Proc & endpoint
flag.BoolVar(&flags.probe.useConntrack, "probe.conntrack", true, "also use conntrack to track connections")
flag.IntVar(&flags.probe.conntrackBufferSize, "probe.conntrack.buffersize", 208*1024, "conntrack buffer size")

This comment was marked as abuse.

This comment was marked as abuse.

flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)")
flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem")
flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections")
Expand Down
2 changes: 1 addition & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func probeMain(flags probeFlags) {
defer dnsSnooper.Stop()
}

endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, flags.procRoot, scanner, dnsSnooper)
endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, flags.procRoot, flags.conntrackBufferSize, scanner, dnsSnooper)

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

defer endpointReporter.Stop()
p.AddReporter(endpointReporter)

Expand Down