Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eBPF proc fallback #2327

Merged
merged 4 commits into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#! /bin/bash

# shellcheck disable=SC1091
. ./config.sh

start_suite "Test short lived connections between containers, with ebpf proc fallback"

weave_on "$HOST1" launch
# Manually start scope in order to set
# `WEAVESCOPE_DOCKER_ARGS="-v /tmp:/sys/kernel/debug/tracing:ro"`
# to make ebpf fail and test the proc fallback.
DOCKER_HOST=tcp://${HOST1}:${DOCKER_PORT} CHECKPOINT_DISABLE=true \
WEAVESCOPE_DOCKER_ARGS="-v /tmp:/sys/kernel/debug/tracing:ro" \
"${SCOPE}" launch --probe.ebpf.connections=true
weave_on "$HOST1" run -d --name nginx nginx
weave_on "$HOST1" run -d --name client alpine /bin/sh -c "while true; do \
wget http://nginx.weave.local:80/ -O - >/dev/null || true; \
sleep 1; \
done"

wait_for_containers "$HOST1" 60 nginx client

has_container "$HOST1" nginx
has_container "$HOST1" client
has_connection containers "$HOST1" client nginx

# Save stdout for debugging output
exec 3>&1
assert_raises "docker_on $HOST1 logs weavescope 2>&1 | grep 'Error setting up the eBPF tracker, falling back to proc scanning' || (docker_on $HOST1 logs weavescope 2>&3 ; false)"

scope_end_suite
56 changes: 27 additions & 29 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type connectionTrackerConfig struct {
UseEbpfConn bool
ProcRoot string
BufferSize int
ProcessCache *process.CachingWalker
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}
Expand All @@ -28,43 +29,36 @@ type connectionTracker struct {
flowWalker flowWalker // Interface
ebpfTracker eventTracker
reverseResolver *reverseResolver
processCache *process.CachingWalker
}

func newProcfsConnectionTracker(conf connectionTrackerConfig) connectionTracker {
if conf.WalkProc && conf.Scanner == nil {
conf.Scanner = procspy.NewConnectionScanner(conf.ProcessCache)
}
return connectionTracker{
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat"),
ebpfTracker: nil,
reverseResolver: newReverseResolver(),
}
}

func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
if !conf.UseEbpfConn {
// ebpf OFF, use flowWalker
return connectionTracker{
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat"),
ebpfTracker: nil,
reverseResolver: newReverseResolver(),
}
// ebpf off, use proc scanning for connection tracking
return newProcfsConnectionTracker(conf)
}
// When ebpf will be active by default, check if it starts correctly otherwise fallback to flowWalk
et, err := newEbpfTracker(conf.UseEbpfConn)
et, err := newEbpfTracker()
if err != nil {
// TODO: fallback to flowWalker, when ebpf is enabled by default
log.Errorf("Error setting up the ebpfTracker, connections will not be reported: %s", err)
noopConnectionTracker := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: nil,
reverseResolver: nil,
}
return noopConnectionTracker
// ebpf failed, fallback to proc scanning for connection tracking
log.Warnf("Error setting up the eBPF tracker, falling back to proc scanning: %v", err)
return newProcfsConnectionTracker(conf)
}

var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(conf.ProcRoot))
processCache.Tick()

ct := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: et,
reverseResolver: newReverseResolver(),
processCache: processCache,
}
go ct.getInitialState()
return ct
Expand All @@ -89,8 +83,7 @@ func flowToTuple(f flow) (ft fourTuple) {
return ft
}

// ReportConnections calls trackers accordingly to the configuration.
// When ebpf is enabled, only performEbpfTrack() is called
// ReportConnections calls trackers according to the configuration.
func (t *connectionTracker) ReportConnections(rpt *report.Report) {
hostNodeID := report.MakeHostNodeID(t.conf.HostID)

Expand Down Expand Up @@ -164,9 +157,14 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
return nil
}

// getInitialState runs conntrack and proc parsing synchronously only
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
scanner := procspy.NewSyncConnectionScanner(t.processCache)
// Run conntrack and proc parsing synchronously only once to initialize ebpfTracker
var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(t.conf.ProcRoot))
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache)
seenTuples := map[string]fourTuple{}
// Consult the flowWalker to get the initial state
if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil {
Expand Down
7 changes: 1 addition & 6 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package endpoint

import (
"errors"
"fmt"
"regexp"
"strconv"
Expand Down Expand Up @@ -77,11 +76,7 @@ func isKernelSupported() error {
return nil
}

func newEbpfTracker(useEbpfConn bool) (eventTracker, error) {
if !useEbpfConn {
return nil, errors.New("ebpf tracker not enabled")
}

func newEbpfTracker() (eventTracker, error) {
if err := isKernelSupported(); err != nil {
return nil, fmt.Errorf("kernel not supported: %v", err)
}
Expand Down
7 changes: 6 additions & 1 deletion probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)

Expand All @@ -29,6 +30,7 @@ type ReporterConfig struct {
UseEbpfConn bool
ProcRoot string
BufferSize int
ProcessCache *process.CachingWalker
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewReporter(conf ReporterConfig) *Reporter {
UseEbpfConn: conf.UseEbpfConn,
ProcRoot: conf.ProcRoot,
BufferSize: conf.BufferSize,
ProcessCache: conf.ProcessCache,
Scanner: conf.Scanner,
DNSSnooper: conf.DNSSnooper,
}),
Expand All @@ -83,7 +86,9 @@ func (Reporter) Name() string { return "Endpoint" }
func (r *Reporter) Stop() {
r.connectionTracker.Stop()
r.natMapper.stop()
r.conf.Scanner.Stop()
if r.conf.Scanner != nil {
r.conf.Scanner.Stop()
}
}

// Report implements Reporter.
Expand Down
8 changes: 1 addition & 7 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/probe/overlay"
Expand Down Expand Up @@ -158,13 +157,8 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
p.AddTagger(probe.NewTopologyTagger(), host.NewTagger(hostID))

var processCache *process.CachingWalker
var scanner procspy.ConnectionScanner
if flags.procEnabled {
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot))
// The eBPF tracker finds connections itself and does not need the connection scanner
if !flags.useEbpfConn {
scanner = procspy.NewConnectionScanner(processCache)
}
p.AddTicker(processCache)
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments))
}
Expand All @@ -185,7 +179,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
UseEbpfConn: flags.useEbpfConn,
ProcRoot: flags.procRoot,
BufferSize: flags.conntrackBufferSize,
Scanner: scanner,
ProcessCache: processCache,
DNSSnooper: dnsSnooper,
})
defer endpointReporter.Stop()
Expand Down