Skip to content

Commit

Permalink
Merge pull request #2699 from weaveworks/ebpf-refactor
Browse files Browse the repository at this point in the history
EbpfTracker refactoring / cleanup
  • Loading branch information
rade authored Jul 11, 2017
2 parents 1abc886 + d568c50 commit 25a1a5f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 66 deletions.
47 changes: 29 additions & 18 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type connectionTrackerConfig struct {
type connectionTracker struct {
conf connectionTrackerConfig
flowWalker flowWalker // Interface
ebpfTracker eventTracker
ebpfTracker *EbpfTracker
reverseResolver *reverseResolver
}

Expand Down Expand Up @@ -91,16 +91,18 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
t.useProcfs()
}

// seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser
seenTuples := map[string]fourTuple{}
t.performFlowWalk(rpt, seenTuples)
// seenTuples contains information about connections seen by
// conntrack
seenTuples := t.performFlowWalk(rpt)

if t.conf.WalkProc && t.conf.Scanner != nil {
t.performWalkProc(rpt, hostNodeID, seenTuples)
}
}

func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[string]fourTuple) {
// Consult the flowWalker for short-lived connections
// performFlowWalk consults the flowWalker for short-lived connections
func (t *connectionTracker) performFlowWalk(rpt *report.Report) map[string]fourTuple {
seenTuples := map[string]fourTuple{}
extraNodeInfo := map[string]string{
Conntracked: "true",
}
Expand All @@ -109,6 +111,24 @@ func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples map[s
seenTuples[tuple.key()] = tuple
t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo)
})
return seenTuples
}

func (t *connectionTracker) existingFlows() map[string]fourTuple {
seenTuples := map[string]fourTuple{}
if !t.conf.UseConntrack {
// log.Warnf("Not using conntrack: disabled")
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
}
return seenTuples
}

func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples map[string]fourTuple) error {
Expand Down Expand Up @@ -144,18 +164,9 @@ func (t *connectionTracker) getInitialState() {
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs)
seenTuples := map[string]fourTuple{}
// Consult the flowWalker to get the initial state
if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
}

// Consult conntrack to get the initial state
seenTuples := t.existingFlows()

conns, err := scanner.Connections()
if err != nil {
Expand Down
74 changes: 36 additions & 38 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,16 @@ type ebpfConnection struct {
pid int
}

type eventTracker interface {
handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string)
walkConnections(f func(ebpfConnection))
feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string)
isReadyToHandleConnections() bool
isDead() bool
stop()
}

// EbpfTracker contains the sets of open and closed TCP connections.
// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`.
type EbpfTracker struct {
sync.Mutex
tracer *tracer.Tracer
readyToHandleConnections bool
dead bool
lastTimestampV4 uint64

openConnections map[string]ebpfConnection
openConnections map[fourTuple]ebpfConnection
closedConnections []ebpfConnection
}

Expand Down Expand Up @@ -79,13 +71,13 @@ func isKernelSupported() error {
return nil
}

func newEbpfTracker() (eventTracker, error) {
func newEbpfTracker() (*EbpfTracker, error) {
if err := isKernelSupported(); err != nil {
return nil, fmt.Errorf("kernel not supported: %v", err)
}

tracker := &EbpfTracker{
openConnections: map[string]ebpfConnection{},
openConnections: map[fourTuple]ebpfConnection{},
}

tracer, err := tracer.NewTracer(tracker.tcpEventCbV4, tracker.tcpEventCbV6, tracker.lostCb)
Expand All @@ -99,20 +91,17 @@ func newEbpfTracker() (eventTracker, error) {
return tracker, nil
}

var lastTimestampV4 uint64

func (t *EbpfTracker) tcpEventCbV4(e tracer.TcpV4) {
if lastTimestampV4 > e.Timestamp {
if t.lastTimestampV4 > e.Timestamp {
// A kernel bug can cause the timestamps to be wrong (e.g. on Ubuntu with Linux 4.4.0-47.68)
// Upgrading the kernel will fix the problem. For further info see:
// https://github.com/iovisor/bcc/issues/790#issuecomment-263704235
// https://github.com/weaveworks/scope/issues/2334
log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, lastTimestampV4)
t.dead = true
log.Errorf("tcp tracer received event with timestamp %v even though the last timestamp was %v. Stopping the eBPF tracker.", e.Timestamp, t.lastTimestampV4)
t.stop()
}

lastTimestampV4 = e.Timestamp
t.lastTimestampV4 = e.Timestamp

if e.Type == tracer.EventFdInstall {
t.handleFdInstall(e.Type, int(e.Pid), int(e.Fd))
Expand All @@ -128,7 +117,6 @@ func (t *EbpfTracker) tcpEventCbV6(e tracer.TcpV6) {

func (t *EbpfTracker) lostCb(count uint64) {
log.Errorf("tcp tracer lost %d events. Stopping the eBPF tracker", count)
t.dead = true
t.stop()
}

Expand Down Expand Up @@ -182,21 +170,24 @@ func tupleFromPidFd(pid int, fd int) (tuple fourTuple, netns string, ok bool) {
}

func (t *EbpfTracker) handleFdInstall(ev tracer.EventType, pid int, fd int) {
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
tuple, netns, ok := tupleFromPidFd(pid, fd)
log.Debugf("EbpfTracker: got fd-install event: pid=%d fd=%d -> tuple=%s netns=%s ok=%v", pid, fd, tuple, netns, ok)
if !ok {
return
}
conn := ebpfConnection{

t.Lock()
defer t.Unlock()

t.openConnections[tuple] = ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: netns,
}
t.openConnections[tuple.String()] = conn
if !process.IsProcInAccept("/proc", strconv.Itoa(pid)) {
t.tracer.RemoveFdInstallWatcher(uint32(pid))
}
}

func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) {
Expand All @@ -212,27 +203,25 @@ func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid

switch ev {
case tracer.EventConnect:
conn := ebpfConnection{
t.openConnections[tuple] = ebpfConnection{
incoming: false,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case tracer.EventAccept:
conn := ebpfConnection{
t.openConnections[tuple] = ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case tracer.EventClose:
if deadConn, ok := t.openConnections[tuple.String()]; ok {
delete(t.openConnections, tuple.String())
if deadConn, ok := t.openConnections[tuple]; ok {
delete(t.openConnections, tuple)
t.closedConnections = append(t.closedConnections, deadConn)
} else {
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple, pid, networkNamespace)
}
default:
log.Debugf("EbpfTracker: unknown event: %s (%d)", ev, ev)
Expand All @@ -255,15 +244,19 @@ func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
}

func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, processesWaitingInAccept []int, hostNodeID string) {
t.readyToHandleConnections = true
t.Lock()
for conn := conns.Next(); conn != nil; conn = conns.Next() {
tuple, namespaceID, incoming := connectionTuple(conn, seenTuples)
if incoming {
t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID)
} else {
t.handleConnection(tracer.EventConnect, tuple, int(conn.Proc.PID), namespaceID)
t.openConnections[tuple] = ebpfConnection{
incoming: incoming,
tuple: tuple,
pid: int(conn.Proc.PID),
networkNamespace: namespaceID,
}
}
t.readyToHandleConnections = true
t.Unlock()

for _, p := range processesWaitingInAccept {
t.tracer.AddFdInstallWatcher(uint32(p))
log.Debugf("EbpfTracker: install fd-install watcher: pid=%d", p)
Expand All @@ -275,12 +268,17 @@ func (t *EbpfTracker) isReadyToHandleConnections() bool {
}

func (t *EbpfTracker) isDead() bool {
t.Lock()
defer t.Unlock()
return t.dead
}

func (t *EbpfTracker) stop() {
if t.tracer != nil {
t.Lock()
alreadyDead := t.dead
t.dead = true
t.Unlock()
if !alreadyDead && t.tracer != nil {
t.tracer.Stop()
}
t.dead = true
}
20 changes: 10 additions & 10 deletions probe/endpoint/ebpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,37 +96,37 @@ func TestHandleConnection(t *testing.T) {
readyToHandleConnections: true,
dead: false,

openConnections: map[string]ebpfConnection{},
openConnections: map[fourTuple]ebpfConnection{},
closedConnections: []ebpfConnection{},
}

tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10))
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4ConnectEbpfConnection) {
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4ConnectEbpfConnection) {
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple])
}

tuple = fourTuple{IPv4ConnectCloseEvent.SAddr.String(), IPv4ConnectCloseEvent.DAddr.String(), uint16(IPv4ConnectCloseEvent.SPort), uint16(IPv4ConnectCloseEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4ConnectCloseEvent.Type, tuple, int(IPv4ConnectCloseEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectCloseEvent.NetNS), 10))
if len(mockEbpfTracker.openConnections) != 0 {
t.Errorf("Connection mismatch close event\nConnection to close:%v",
mockEbpfTracker.openConnections[tuple.String()])
mockEbpfTracker.openConnections[tuple])
}

mockEbpfTracker = &EbpfTracker{
readyToHandleConnections: true,
dead: false,

openConnections: map[string]ebpfConnection{},
openConnections: map[fourTuple]ebpfConnection{},
closedConnections: []ebpfConnection{},
}

tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)}
mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10))
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4AcceptEbpfConnection) {
if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple], IPv4AcceptEbpfConnection) {
t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v",
IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple.String()])
IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple])
}

tuple = fourTuple{IPv4AcceptCloseEvent.SAddr.String(), IPv4AcceptCloseEvent.DAddr.String(), uint16(IPv4AcceptCloseEvent.SPort), uint16(IPv4AcceptCloseEvent.DPort)}
Expand Down Expand Up @@ -158,8 +158,8 @@ func TestWalkConnections(t *testing.T) {
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[string]ebpfConnection{
activeTuple.String(): {
openConnections: map[fourTuple]ebpfConnection{
activeTuple: {
tuple: activeTuple,
networkNamespace: "12345",
incoming: true,
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestInvalidTimeStampDead(t *testing.T) {
mockEbpfTracker := &EbpfTracker{
readyToHandleConnections: true,
dead: false,
openConnections: map[string]ebpfConnection{},
openConnections: map[fourTuple]ebpfConnection{},
}
event.Timestamp = 0
mockEbpfTracker.tcpEventCbV4(event)
Expand Down

0 comments on commit 25a1a5f

Please sign in to comment.