diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 8eefb2a421b..e4f03e9e1f8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -312,6 +312,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135] - Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455] - Flows are enriched with process information using the process monitor. {pull}7507[7507] +- Added UDP support to process monitor. {pull}7571[7571] *Winlogbeat* diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index e6bb5df050f..8ce9bdbc901 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/packetbeat/procs" + "github.com/elastic/beats/packetbeat/protos/applayer" ) type flowsProcessor struct { @@ -216,6 +217,7 @@ func createEvent( source := common.MapStr{} dest := common.MapStr{} tuple := common.IPPortTuple{} + var proto applayer.Transport // add ethernet layer meta data if src, dst, ok := f.id.EthAddr(); ok { @@ -282,9 +284,11 @@ func createEvent( // udp layer meta data if src, dst, ok := f.id.UDPAddr(); ok { - source["port"] = binary.LittleEndian.Uint16(src) - dest["port"] = binary.LittleEndian.Uint16(dst) + tuple.SrcPort = binary.LittleEndian.Uint16(src) + tuple.DstPort = binary.LittleEndian.Uint16(dst) + source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort fields["transport"] = "udp" + proto = applayer.TransportUDP } // tcp layer meta data @@ -293,6 +297,7 @@ func createEvent( tuple.DstPort = binary.LittleEndian.Uint16(dst) source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort fields["transport"] = "tcp" + proto = applayer.TransportTCP } if id := f.id.ConnectionID(); id != nil { @@ -311,7 +316,7 @@ func createEvent( // Set process information if it's available if tuple.IPLength != 0 && tuple.SrcPort != 0 { - if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil { + if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); cmdline != nil { src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline) for key, value := range map[string]string{ diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index 651e49967a6..649155dfa79 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/packetbeat/protos/applayer" "github.com/elastic/gosigar" ) @@ -49,7 +50,7 @@ type process struct { type processWatcherImpl interface { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. - GetLocalPortToPIDMapping() (ports map[uint16]int, err error) + GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) // GetProcessCommandLine returns the command line for a given process. GetProcessCommandLine(pid int) string // GetLocalIPs returns the list of local addresses. @@ -57,7 +58,7 @@ type processWatcherImpl interface { } type ProcessesWatcher struct { - portProcMap map[uint16]portProcMapping + portProcMap map[applayer.Transport]map[uint16]portProcMapping localAddrs []net.IP processCache map[int]*process @@ -76,7 +77,11 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error { func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error { proc.impl = impl - proc.portProcMap = make(map[uint16]portProcMapping) + proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{ + applayer.TransportUDP: make(map[uint16]portProcMapping), + applayer.TransportTCP: make(map[uint16]portProcMapping), + } + proc.processCache = make(map[int]*process) proc.enabled = config.Enabled @@ -99,7 +104,21 @@ func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatch return nil } -func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { +// FindProcessesTupleTCP looks up local process information for the source and +// destination addresses of TCP tuple +func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { + return proc.FindProcessesTuple(tuple, applayer.TransportTCP) +} + +// FindProcessesTupleUDP looks up local process information for the source and +// destination addresses of UDP tuple +func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { + return proc.FindProcessesTuple(tuple, applayer.TransportUDP) +} + +// FindProcessesTuple looks up local process information for the source and +// destination addresses of a tuple for the given transport protocol +func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) (procTuple *common.CmdlineTuple) { procTuple = &common.CmdlineTuple{} if !proc.enabled { @@ -107,35 +126,40 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (pro } if proc.isLocalIP(tuple.SrcIP) { - if p := proc.findProc(tuple.SrcPort); p != nil { + if p := proc.findProc(tuple.SrcPort, transport); p != nil { procTuple.Src = []byte(p.name) procTuple.SrcCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.SrcPort) + logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport) } } if proc.isLocalIP(tuple.DstIP) { - if p := proc.findProc(tuple.DstPort); p != nil { + if p := proc.findProc(tuple.DstPort, transport); p != nil { procTuple.Dst = []byte(p.name) procTuple.DstCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.DstPort) + logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport) } } return } -func (proc *ProcessesWatcher) findProc(port uint16) *process { +func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process { defer logp.Recover("FindProc exception") - p, exists := proc.portProcMap[port] + procMap, ok := proc.portProcMap[transport] + if !ok { + return nil + } + + p, exists := procMap[port] if exists { return p.proc } - proc.updateMap() + proc.updateMap(transport) - p, exists = proc.portProcMap[port] + p, exists = procMap[port] if exists { return p.proc } @@ -143,7 +167,7 @@ func (proc *ProcessesWatcher) findProc(port uint16) *process { return nil } -func (proc *ProcessesWatcher) updateMap() { +func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() defer func() { @@ -151,7 +175,7 @@ func (proc *ProcessesWatcher) updateMap() { }() } - ports, err := proc.impl.GetLocalPortToPIDMapping() + ports, err := proc.impl.GetLocalPortToPIDMapping(transport) if err != nil { logp.Err("unable to list local ports: %v", err) } @@ -159,12 +183,12 @@ func (proc *ProcessesWatcher) updateMap() { proc.expireProcessCache() for port, pid := range ports { - proc.updateMappingEntry(port, pid) + proc.updateMappingEntry(transport, port, pid) } } -func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) { - prev, ok := proc.portProcMap[port] +func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) { + prev, ok := proc.portProcMap[transport][port] if ok && prev.pid == pid { // This port->pid mapping already exists return @@ -179,10 +203,10 @@ func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) { // We never expire entries from this map. Since there are 65k possible // ports, the size of the dict can be max 1.5 MB, which we consider // reasonable. - proc.portProcMap[port] = portProcMapping{port: port, pid: pid, proc: p} + proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p} - logp.Debug("procsdetailed", "updateMappingEntry(): port=%d pid=%d process='%s' name=%s", - port, pid, p.commandLine, p.name) + logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s", + port, transport, pid, p.commandLine, p.name) } func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { diff --git a/packetbeat/procs/procs_linux.go b/packetbeat/procs/procs_linux.go index eff29718d24..907e9f42bca 100644 --- a/packetbeat/procs/procs_linux.go +++ b/packetbeat/procs/procs_linux.go @@ -32,6 +32,7 @@ import ( "strings" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/packetbeat/protos/applayer" "github.com/elastic/gosigar" ) @@ -43,22 +44,33 @@ type socketInfo struct { inode uint64 } +var procFiles = map[applayer.Transport]struct { + ipv4, ipv6 string +}{ + applayer.TransportUDP: {"/proc/net/udp", "/proc/net/udp6"}, + applayer.TransportTCP: {"/proc/net/tcp", "/proc/net/tcp6"}, +} + // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + sourceFiles, ok := procFiles[transport] + if !ok { + return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) + } var pids gosigar.ProcList if err = pids.Get(); err != nil { return nil, err } logp.Debug("procs", "getLocalPortsToPIDs()") - ipv4socks, err := socketsFromProc("/proc/net/tcp", false) + ipv4socks, err := socketsFromProc(sourceFiles.ipv4, false) if err != nil { - logp.Err("Parse_Proc_Net_Tcp: %s", err) + logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv4, err) return nil, err } - ipv6socks, err := socketsFromProc("/proc/net/tcp6", true) + ipv6socks, err := socketsFromProc(sourceFiles.ipv6, true) if err != nil { - logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err) + logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv6, err) return nil, err } socksMap := map[uint64]*socketInfo{} @@ -126,11 +138,11 @@ func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) { return nil, err } defer file.Close() - return parseProcNetTCP(file, ipv6) + return parseProcNetProto(file, ipv6) } -// Parses the /proc/net/tcp file -func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) { +// Parses the /proc/net/(tcp|udp)6? file +func parseProcNetProto(input io.Reader, ipv6 bool) ([]*socketInfo, error) { buf := bufio.NewReader(input) sockets := []*socketInfo{} @@ -139,7 +151,7 @@ func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) { for err != io.EOF { line, err = buf.ReadBytes('\n') if err != nil && err != io.EOF { - logp.Err("Error reading proc net tcp file: %s", err) + logp.Err("Error reading proc net file: %s", err) return nil, err } words := bytes.Fields(line) diff --git a/packetbeat/procs/procs_other.go b/packetbeat/procs/procs_other.go index 98698fe46f3..b91a132adbe 100644 --- a/packetbeat/procs/procs_other.go +++ b/packetbeat/procs/procs_other.go @@ -19,8 +19,10 @@ package procs +import "github.com/elastic/beats/packetbeat/protos/applayer" + // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { return nil, nil } diff --git a/packetbeat/procs/procs_test.go b/packetbeat/procs/procs_test.go index d8fa7e3b304..5278730480e 100644 --- a/packetbeat/procs/procs_test.go +++ b/packetbeat/procs/procs_test.go @@ -26,13 +26,15 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/packetbeat/protos/applayer" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) type testingImpl struct { localIPs []net.IP - portToPID map[uint16]int + portToPID map[applayer.Transport]map[uint16]int pidToCmdline map[int]string } @@ -40,25 +42,29 @@ type runningProcess struct { cmdline string pid int ports []uint16 + proto applayer.Transport } func newTestingImpl(localIPs []net.IP, processes []runningProcess) *testingImpl { impl := &testingImpl{ - localIPs: localIPs, - portToPID: make(map[uint16]int), + localIPs: localIPs, + portToPID: map[applayer.Transport]map[uint16]int{ + applayer.TransportTCP: make(map[uint16]int), + applayer.TransportUDP: make(map[uint16]int), + }, pidToCmdline: make(map[int]string), } for _, proc := range processes { for _, port := range proc.ports { - impl.portToPID[port] = proc.pid + impl.portToPID[proc.proto][port] = proc.pid } impl.pidToCmdline[proc.pid] = proc.cmdline } return impl } -func (impl *testingImpl) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { - return impl.portToPID, nil +func (impl *testingImpl) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + return impl.portToPID[transport], nil } func (impl *testingImpl) GetProcessCommandLine(pid int) string { @@ -92,16 +98,25 @@ func TestFindProcessTuple(t *testing.T) { cmdline: "curl -o /dev/null http://example.net/", pid: 101, ports: []uint16{65535}, + proto: applayer.TransportTCP, }, { cmdline: "/usr/X11/bin/webbrowser", pid: 102, ports: []uint16{3201, 3202, 3203}, + proto: applayer.TransportTCP, }, { cmdline: "nc -v -l -p 80", pid: 105, ports: []uint16{80}, + proto: applayer.TransportTCP, + }, + { + cmdline: "bind", + pid: 333, + ports: []uint16{53}, + proto: applayer.TransportUDP, }, }) procs := ProcessesWatcher{} @@ -112,10 +127,12 @@ func TestFindProcessTuple(t *testing.T) { name string srcIP, dstIP, src, dst, srcCmd, dstCmd string srcPort, dstPort int + proto applayer.Transport preAction func() }{ { name: "Unrelated local HTTP client", + proto: applayer.TransportTCP, srcIP: "127.0.0.1", srcPort: 12345, dstIP: "1.2.3.4", dstPort: 80, src: "", srcCmd: "", @@ -123,6 +140,7 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "Web browser (IPv6)", + proto: applayer.TransportTCP, srcIP: "7777::0:33", srcPort: 3201, dstIP: "1234:1234::AAAA", dstPort: 443, src: "", srcCmd: "/usr/X11/bin/webbrowser", @@ -130,13 +148,23 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "Curl request", + proto: applayer.TransportTCP, srcIP: "192.168.1.1", srcPort: 65535, dstIP: "1.1.1.1", dstPort: 80, src: "Curl", srcCmd: "curl -o /dev/null http://example.net/", dst: "", dstCmd: "", }, + { + name: "Unrelated UDP using same port as TCP", + proto: applayer.TransportUDP, + srcIP: "192.168.1.1", srcPort: 65535, + dstIP: "1.1.1.1", dstPort: 80, + src: "", srcCmd: "", + dst: "", dstCmd: "", + }, { name: "Local web browser to netcat server", + proto: applayer.TransportTCP, srcIP: "127.0.0.1", srcPort: 3202, dstIP: "127.0.0.1", dstPort: 80, src: "", srcCmd: "/usr/X11/bin/webbrowser", @@ -144,6 +172,7 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "External to netcat server", + proto: applayer.TransportTCP, srcIP: "192.168.1.2", srcPort: 3203, dstIP: "192.168.1.1", dstPort: 80, src: "", srcCmd: "", @@ -154,13 +183,22 @@ func TestFindProcessTuple(t *testing.T) { preAction: func() { // add a new running process impl.pidToCmdline[555] = "/usr/bin/nmap -sT -P443 10.0.0.0/8" - impl.portToPID[55555] = 555 + impl.portToPID[applayer.TransportTCP][55555] = 555 }, + proto: applayer.TransportTCP, srcIP: "7777::33", srcPort: 55555, dstIP: "10.1.2.3", dstPort: 443, src: "NMap", srcCmd: "/usr/bin/nmap -sT -P443 10.0.0.0/8", dst: "", dstCmd: "", }, + { + name: "DNS request (UDP)", + proto: applayer.TransportUDP, + srcIP: "1234:5678::55", srcPort: 533, + dstIP: "7777::33", dstPort: 53, + src: "", srcCmd: "", + dst: "", dstCmd: "bind", + }, } { msg := fmt.Sprintf("test case #%d: %s", idx+1, testCase.name) @@ -175,7 +213,7 @@ func TestFindProcessTuple(t *testing.T) { DstPort: uint16(testCase.dstPort), }, } - result := procs.FindProcessesTuple(&input) + result := procs.FindProcessesTuple(&input, testCase.proto) // nil result is not valid assert.NotNil(t, result, msg) diff --git a/packetbeat/procs/procs_windows.go b/packetbeat/procs/procs_windows.go index 16e6257aaeb..0970b52f51c 100644 --- a/packetbeat/procs/procs_windows.go +++ b/packetbeat/procs/procs_windows.go @@ -27,6 +27,8 @@ import ( "unsafe" "golang.org/x/sys/windows" + + "github.com/elastic/beats/packetbeat/protos/applayer" ) type extractor interface { @@ -41,20 +43,33 @@ type extractorFactory func(fn callbackFn) extractor type tcpRowOwnerPIDExtractor callbackFn type tcp6RowOwnerPIDExtractor callbackFn +type udpRowOwnerPIDExtractor callbackFn +type udp6RowOwnerPIDExtractor callbackFn -var tables = []struct { +var tablesByTransport = map[applayer.Transport][]struct { family uint32 function GetExtendedTableFn class uint32 extractor extractorFactory }{ - {windows.AF_INET, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCPRowOwnerPID}, - {windows.AF_INET6, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCP6RowOwnerPID}, + applayer.TransportTCP: { + {windows.AF_INET, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCPRowOwnerPID}, + {windows.AF_INET6, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCP6RowOwnerPID}, + }, + applayer.TransportUDP: { + {windows.AF_INET, _GetExtendedUdpTable, UDP_TABLE_OWNER_PID, extractUDPRowOwnerPID}, + {windows.AF_INET6, _GetExtendedUdpTable, UDP_TABLE_OWNER_PID, extractUDP6RowOwnerPID}, + }, } // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + tables, ok := tablesByTransport[transport] + if !ok { + return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) + } + storeResults := func(localPort uint16, pid int) { ports[localPort] = pid } @@ -121,6 +136,14 @@ func extractTCP6RowOwnerPID(fn callbackFn) extractor { return tcp6RowOwnerPIDExtractor(fn) } +func extractUDPRowOwnerPID(fn callbackFn) extractor { + return udpRowOwnerPIDExtractor(fn) +} + +func extractUDP6RowOwnerPID(fn callbackFn) extractor { + return udp6RowOwnerPIDExtractor(fn) +} + // Extract will parse a row of Size() bytes pointed to by ptr func (e tcpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*TCPRowOwnerPID)(ptr) @@ -142,3 +165,25 @@ func (e tcp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { func (tcp6RowOwnerPIDExtractor) Size() int { return int(unsafe.Sizeof(TCP6RowOwnerPID{})) } + +// Extract will parse a row of Size() bytes pointed to by ptr +func (e udpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { + row := (*UDPRowOwnerPID)(ptr) + e(uint32FieldToPort(row.localPort), int(row.owningPID)) +} + +// Size returns the size of a table row +func (udpRowOwnerPIDExtractor) Size() int { + return int(unsafe.Sizeof(UDPRowOwnerPID{})) +} + +// Extract will parse a row of Size() bytes pointed to by ptr +func (e udp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { + row := (*UDP6RowOwnerPID)(ptr) + e(uint32FieldToPort(row.localPort), int(row.owningPID)) +} + +// Size returns the size of a table row +func (udp6RowOwnerPIDExtractor) Size() int { + return int(unsafe.Sizeof(UDP6RowOwnerPID{})) +} diff --git a/packetbeat/procs/syscall_windows.go b/packetbeat/procs/syscall_windows.go index 16cc4ee165a..f3f1aa00baf 100644 --- a/packetbeat/procs/syscall_windows.go +++ b/packetbeat/procs/syscall_windows.go @@ -24,6 +24,7 @@ import ( ) const ( + UDP_TABLE_OWNER_PID = 1 TCP_TABLE_OWNER_PID_ALL = 5 sizeOfDWORD = 4 @@ -51,6 +52,19 @@ type TCP6RowOwnerPID struct { owningPID uint32 } +type UDPRowOwnerPID struct { + localAddr uint32 + localPort uint32 + owningPID uint32 +} + +type UDP6RowOwnerPID struct { + localAddr [16]byte + localScopeID uint32 + localPort uint32 + owningPID uint32 +} + // GetExtendedTableFn is the prototype for GetExtendedTcpTable and GetExtendedUdpTable type GetExtendedTableFn func(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) @@ -59,3 +73,4 @@ type GetExtendedTableFn func(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ul // Windows API calls //sys _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) = iphlpapi.GetExtendedTcpTable +//sys _GetExtendedUdpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) = iphlpapi.GetExtendedUdpTable diff --git a/packetbeat/procs/zsyscall_windows.go b/packetbeat/procs/zsyscall_windows.go index 94e1a299f02..b313698211e 100644 --- a/packetbeat/procs/zsyscall_windows.go +++ b/packetbeat/procs/zsyscall_windows.go @@ -57,6 +57,7 @@ var ( modiphlpapi = windows.NewLazySystemDLL("iphlpapi.dll") procGetExtendedTcpTable = modiphlpapi.NewProc("GetExtendedTcpTable") + procGetExtendedUdpTable = modiphlpapi.NewProc("GetExtendedUdpTable") ) func _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) { @@ -77,3 +78,22 @@ func _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf } return } + +func _GetExtendedUdpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) { + var _p0 uint32 + if bOrder { + _p0 = 1 + } else { + _p0 = 0 + } + r0, _, e1 := syscall.Syscall6(procGetExtendedUdpTable.Addr(), 6, uintptr(pTcpTable), uintptr(unsafe.Pointer(pdwSize)), uintptr(_p0), uintptr(ulAf), uintptr(tableClass), uintptr(reserved)) + code = syscall.Errno(r0) + if code == 0 { + if e1 != 0 { + err = errnoErr(e1) + } else { + err = syscall.EINVAL + } + } + return +} diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index 8c2d78e110e..2e59641bd90 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -336,7 +336,7 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di debugf("A message is ready to be handled") m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.method == "basic.publish" { amqp.handlePublishing(m) diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index a1ed3c2aa90..49eb7be7caf 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -59,7 +59,7 @@ func (trans *transactions) onMessage( var err error msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(&msg.Tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index 445dc8091ca..bce95d6c766 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -150,7 +150,7 @@ func (dns *dnsPlugin) handleDNS(conn *dnsConnectionData, tcpTuple *common.TCPTup message := conn.data[dir].message dnsTuple := dnsTupleFromIPPort(&message.tuple, transportTCP, decodedData.Id) - message.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcpTuple.IPPort()) + message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcpTuple.IPPort()) message.data = decodedData message.length += decodeOffset diff --git a/packetbeat/protos/dns/dns_udp.go b/packetbeat/protos/dns/dns_udp.go index f89087ae560..d563a97837b 100644 --- a/packetbeat/protos/dns/dns_udp.go +++ b/packetbeat/protos/dns/dns_udp.go @@ -47,7 +47,7 @@ func (dns *dnsPlugin) ParseUDP(pkt *protos.Packet) { dnsMsg := &dnsMessage{ ts: pkt.Ts, tuple: pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTuple(&pkt.Tuple), + cmdlineTuple: procs.ProcWatcher.FindProcessesTupleUDP(&pkt.Tuple), data: dnsPkt, length: packetSize, } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index caea7e1229e..7415fea3988 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -415,7 +415,7 @@ func (http *httpPlugin) handleHTTP( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) http.hideHeaders(m) if m.isRequest { diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index 4b9e2226ef0..ff9f15f266b 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -191,7 +191,7 @@ func (mc *memcache) onTCPMessage( ) error { msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tuple) if msg.IsRequest { return mc.onTCPRequest(conn, tuple, dir, msg) diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 2287f317491..e0041ccf83f 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -184,7 +184,7 @@ func (mc *memcache) onUDPMessage( } msg.Tuple = *tuple msg.Transport = applayer.TransportUDP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleUDP(tuple) done := false var err error diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index 549feeb2016..5b7eb9d3dea 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -217,7 +217,7 @@ func (mongodb *mongodbPlugin) handleMongodb( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isResponse { debugf("MongoDB response message") diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 096254aac5a..dacbdb5c769 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -582,7 +582,7 @@ func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) m.raw = rawMsg if m.isRequest { diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index c9dfa5d5809..3159bea89ca 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -361,7 +361,7 @@ var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCP m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { pgsql.receivedPgsqlRequest(m) diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index 5de7c39f752..4e51dbb2e52 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -241,7 +241,7 @@ func (redis *redisPlugin) handleRedis( ) { m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { conn.requests.append(m) // wait for response diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index cb4e7bca7cd..25088cc87ef 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -895,7 +895,7 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // all ok, go to next level stream.message.tcpTuple = *tcptuple stream.message.direction = dir - stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if stream.message.frameSize == 0 { stream.message.frameSize = uint32(stream.parseOffset - stream.message.start) } diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 33f0494e355..5e0ea77452f 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -161,7 +161,7 @@ func (plugin *tlsPlugin) doParse( st := conn.streams[dir] if st == nil { st = newStream(tcptuple) - st.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + st.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) conn.streams[dir] = st }