Skip to content

Commit

Permalink
eBPF-based resolving of actual TCP connection destinations (conntrack)
Browse files Browse the repository at this point in the history
  • Loading branch information
def committed Nov 29, 2024
1 parent e02bf28 commit 1279365
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 288 deletions.
96 changes: 0 additions & 96 deletions containers/conntrack.go

This file was deleted.

75 changes: 9 additions & 66 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,14 @@ type Container struct {

logParsers map[string]*LogParser

hostConntrack *Conntrack
nsConntrack *Conntrack
lbConntracks []*Conntrack

registry *Registry

lock sync.RWMutex

done chan struct{}
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32, registry *Registry) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -177,24 +173,10 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

logParsers: map[string]*LogParser{},

hostConntrack: hostConntrack,

registry: registry,

done: make(chan struct{}),
}

for _, n := range md.networks {
if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
if ct, err := NewConntrack(nsHandle); err != nil {
klog.Warningln(err)
} else {
c.lbConntracks = append(c.lbConntracks, ct)
}
_ = nsHandle.Close()
}
}

c.runLogParser("")

go func() {
Expand All @@ -217,12 +199,6 @@ func (c *Container) Close() {
for _, p := range c.logParsers {
p.Stop()
}
for _, ct := range c.lbConntracks {
_ = ct.Close()
}
if c.nsConntrack != nil {
_ = c.nsConntrack.Close()
}
close(c.done)
}

Expand Down Expand Up @@ -519,7 +495,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
}
}

func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst, actualDst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
if common.PortFilter.ShouldBeSkipped(dst.Port()) {
return
}
Expand All @@ -530,23 +506,20 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if dst.IP().IsLoopback() && !p.isHostNs() {
return
}
actualDst, err := c.getActualDestination(p, src, dst)
if err != nil {
if !common.IsNotExist(err) {
klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
if actualDst.Port() == 0 {
if a := lookupCiliumConntrackTable(src, dst); a != nil {
actualDst = *a
} else {
actualDst = dst
}
return
}
switch {
case actualDst == nil:
actualDst = &dst
case actualDst.IP().IsLoopback() && !p.isHostNs():
if actualDst.IP().IsLoopback() && !p.isHostNs() {
return
}
if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
return
}
key := common.NewDestinationKey(dst, *actualDst, c.registry.getFQDN(dst.IP()))
key := common.NewDestinationKey(dst, actualDst, c.registry.getFQDN(dst.IP()))
c.lock.Lock()
defer c.lock.Unlock()
if failed {
Expand All @@ -571,36 +544,6 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
c.lastConnectionAttempts[key.Destination()] = time.Now()
}

func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
return actualDst, nil
}
for _, lb := range c.lbConntracks {
if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
return actualDst, nil
}
}
actualDst := c.hostConntrack.GetActualDestination(src, dst)
if actualDst != nil {
return actualDst, nil
}
if !p.isHostNs() {
if c.nsConntrack == nil {
netNs, err := proc.GetNetNs(p.Pid)
if err != nil {
return nil, err
}
defer netNs.Close()
c.nsConntrack, err = NewConntrack(netNs)
if err != nil {
return nil, err
}
}
return c.nsConntrack.GetActualDestination(src, dst), nil
}
return nil, nil
}

func (c *Container) onConnectionClose(e ebpftracer.Event) {
c.lock.Lock()
conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
Expand Down
22 changes: 0 additions & 22 deletions containers/dockerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package containers
import (
"context"
"fmt"
"os"
"path"
"strings"
"time"

"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/proc"
"github.com/coroot/logparser"
"github.com/docker/docker/client"
"github.com/vishvananda/netns"
"inet.af/netaddr"
)

Expand Down Expand Up @@ -106,22 +103,3 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
}
return res, nil
}

func FindNetworkLoadBalancerNs(networkId string) netns.NsHandle {
basePath := "/run/docker/netns"
files, err := os.ReadDir(proc.HostPath(basePath))
if err != nil {
return -1
}
for _, f := range files {
if !f.Type().IsRegular() || !strings.HasPrefix(f.Name(), "lb_") {
continue
}
idPrefix := strings.Split(f.Name(), "_")[1]
if strings.HasPrefix(networkId, idPrefix) {
ns, _ := netns.GetFromPath(proc.HostPath(path.Join(basePath, f.Name())))
return ns
}
}
return -1
}
25 changes: 6 additions & 19 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type Registry struct {
tracer *ebpftracer.Tracer
events chan ebpftracer.Event

hostConntrack *Conntrack

containersById map[ContainerID]*Container
containersByCgroupId map[string]*Container
containersByPid map[uint32]*Container
Expand Down Expand Up @@ -96,25 +94,18 @@ func NewRegistry(reg prometheus.Registerer, processInfoCh chan<- ProcessInfo) (*
if err := JournaldInit(); err != nil {
klog.Warningln(err)
}
ct, err := NewConntrack(hostNetNs)
if err != nil {
return nil, err
}

r := &Registry{
reg: reg,
events: make(chan ebpftracer.Event, 10000),

hostConntrack: ct,

reg: reg,
events: make(chan ebpftracer.Event, 10000),
containersById: map[ContainerID]*Container{},
containersByCgroupId: map[string]*Container{},
containersByPid: map[uint32]*Container{},
ip2fqdn: map[netaddr.IP]string{},

processInfoCh: processInfoCh,

tracer: ebpftracer.NewTracer(*flags.DisableL7Tracing),
tracer: ebpftracer.NewTracer(hostNetNs, *flags.DisableL7Tracing),

trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),
}
Expand Down Expand Up @@ -253,14 +244,14 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {

case ebpftracer.EventTypeConnectionOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.ActualDstAddr, e.Timestamp, false, e.Duration)
c.attachTlsUprobes(r.tracer, e.Pid)
} else {
klog.Infoln("TCP connection from unknown container", e)
}
case ebpftracer.EventTypeConnectionError:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true, e.Duration)
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.ActualDstAddr, 0, true, e.Duration)
} else {
klog.Infoln("TCP connection error from unknown container", e)
}
Expand Down Expand Up @@ -345,16 +336,12 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
c.cgroup = cg
c.metadata = md
c.runLogParser("")
if c.nsConntrack != nil {
_ = c.nsConntrack.Close()
c.nsConntrack = nil
}
}
r.containersByPid[pid] = c
r.containersByCgroupId[cg.Id] = c
return c
}
c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
c, err := NewContainer(id, cg, md, pid, r)
if err != nil {
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
return nil
Expand Down
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ebpftracer/ebpf/ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct trace_event_raw_sys_exit__stub {

#include "proc.c"
#include "file.c"
#include "tcp/conntrack.c"
#include "tcp/state.c"
#include "tcp/retransmit.c"
#include "l7/l7.c"
Expand Down
Loading

0 comments on commit 1279365

Please sign in to comment.