Skip to content

Commit

Permalink
Tcp dump (#919)
Browse files Browse the repository at this point in the history
* Added tcpdump reporter

* Error should only trigger in else conditon [skip ci]

* f-string needed for variable [skip ci]

* Space added [skip ci]

* Added MDNS

* TCPDump provider now starts with containers

* TCPDump is now filtering out only specific first lines
  • Loading branch information
ArneTR authored Dec 9, 2024
1 parent 5b8b3c6 commit c085073
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 11 deletions.
7 changes: 5 additions & 2 deletions config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,11 @@ measurement:
# Hardware_Availability_Year: 2011
######### vhost_ratio is the virtualization degree of the machine. For Bare Metal this is 1. For 1 out of 4 VMs this would be 0.25 etc.
# VHost_Ratio: 1

#--- END
#
###### DEBUG
# network.connections.tcpdump.system.provider.NetworkConnectionsTcpdumpSystemProvider:
# split_ports: True
#--- END


sci:
Expand Down
3 changes: 3 additions & 0 deletions metric_providers/network/connections/tcpdump/system/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Information

See https://docs.green-coding.io/docs/measuring/metric-providers/network-connections-tcpdump-system/ for details.
179 changes: 179 additions & 0 deletions metric_providers/network/connections/tcpdump/system/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import os
import re
from collections import defaultdict
import ipaddress
#import netifaces

from metric_providers.base import BaseMetricProvider
from lib.db import DB

class NetworkConnectionsTcpdumpSystemProvider(BaseMetricProvider):
def __init__(self, *, split_ports=True, skip_check=False):
super().__init__(
metric_name='network_connections_tcpdump_system',
metrics={},
resolution=None,
unit=None,
current_dir=os.path.dirname(os.path.abspath(__file__)),
metric_provider_executable='tcpdump.sh',
skip_check=skip_check
)
self.split_ports = split_ports


def read_metrics(self, run_id, containers=None):
with open(self._filename, 'r', encoding='utf-8') as file:
lines = file.readlines()

stats = parse_tcpdump(lines, split_ports=self.split_ports)

if rows := len(stats):
DB().query("""
UPDATE runs
SET logs= COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(generate_stats_string(stats), run_id))
return rows

return 0

def get_stderr(self):
stderr = super().get_stderr()

if not stderr:
return stderr

# truncate the first two bogus line with information similar to:
# tcpdump: listening on eno2, link-type EN10MB (Ethernet), snapshot length 262144 bytes
line_token = stderr.find("\n")
if line_token and 'tcpdump: data link type' in stderr[:line_token]:
stderr = stderr[stderr.find("\n")+1:]
if line_token and 'tcpdump: listening on' in stderr[:line_token]:
stderr = stderr[stderr.find("\n")+1:]

return stderr

def get_primary_interface():
gateways = netifaces.gateways()
if 'default' in gateways and netifaces.AF_INET in gateways['default']:
return gateways['default'][netifaces.AF_INET][1]

raise RuntimeError('Could not get primary network interface')

def get_ip_addresses(interface):
addresses = []

try:
addrs = netifaces.ifaddresses(interface)

if netifaces.AF_INET in addrs:
addresses.append(addrs[netifaces.AF_INET][0]['addr'])

if netifaces.AF_INET6 in addrs:
# Get the first non-link-local IPv6 address
for addr in addrs[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80:') and not addr['addr'].startswith('fd00:'):
addresses.append(addr['addr'])
break
except RuntimeError as e:
print(f"Error getting IP addresses: {e}")

if not addresses:
raise RuntimeError('Could not determine either IPv4 or IPv6 address')

return addresses

def parse_tcpdump(lines, split_ports=False):
stats = defaultdict(lambda: {'ports': defaultdict(lambda: {'packets': 0, 'bytes': 0}), 'total_bytes': 0})
ip_pattern = r'(\S+) > (\S+):'
#tcp_pattern = r'Flags \[(.+?)\]'

for line in lines:
ip_match = re.search(ip_pattern, line)
#tcp_match = re.search(tcp_pattern, line)

if ip_match:
src, dst = ip_match.groups()
src_ip, src_port = parse_ip_port(src)
dst_ip, dst_port = parse_ip_port(dst)

if src_ip and dst_ip:
protocol = "UDP" if "UDP" in line else "TCP"

if protocol == "UDP":
# For UDP, use the reported length
length_pattern = r'length:? (\d+)'
length_match = re.search(length_pattern, line)
if not length_match or not length_match.group(1):
raise RuntimeError(f"Could not find UDP packet length for line: {line}")
packet_length = int(length_match.group(1))

else:
# For TCP, estimate packet length (this is a simplification)
length_pattern = r'length (\d+)'
length_match = re.search(length_pattern, line)

if not length_match or not length_match.group(1):
if '.53 ' in line or '.53:' in line or '.5353 ' in line or '.5353:' in line: # try DNS / MDNS match
dns_packet_length = re.match(r'.*\((\d+)\)$', line)
if not dns_packet_length:
raise RuntimeError(f"Could not find TCP packet length for line: {line}")
packet_length = int(dns_packet_length[1])
else:
raise RuntimeError(f"No packet length was detected for line {line}")
else:
packet_length = 40 + int(length_match.group(1)) # Assuming 40 bytes for IP + TCP headers

# Update source IP stats
if split_ports:
stats[src_ip]['ports'][f"{src_port}/{protocol}"]['packets'] += 1
stats[src_ip]['ports'][f"{src_port}/{protocol}"]['bytes'] += packet_length
else:
stats[src_ip]['ports'][f"{protocol}"]['packets'] += 1 # alternative without splitting by port
stats[src_ip]['ports'][f"{protocol}"]['bytes'] += packet_length # alternative without splitting by port

stats[src_ip]['total_bytes'] += packet_length

# Update destination IP stats
if split_ports:
stats[dst_ip]['ports'][f"{dst_port}/{protocol}"]['packets'] += 1
stats[dst_ip]['ports'][f"{dst_port}/{protocol}"]['bytes'] += packet_length
else:
stats[dst_ip]['ports'][f"{protocol}"]['packets'] += 1 # alternative without splitting by port
stats[dst_ip]['ports'][f"{protocol}"]['bytes'] += packet_length # alternative without splitting by port

stats[dst_ip]['total_bytes'] += packet_length

return stats

def parse_ip_port(address):
try:
if ']' in address: # IPv6
ip, port = address.rsplit('.', 1)
ip = ip.strip('[]')
else: # IPv4
ip, port = address.rsplit('.', 1)

# Validate IP address
ipaddress.ip_address(ip)
return ip, int(port)
except ValueError:
return None, None

def generate_stats_string(stats, filter_host=False):
primary_interface = get_primary_interface()
ip_addresses = get_ip_addresses(primary_interface)

buffer = []
for ip, data in stats.items():
if filter_host and ip in ip_addresses:
continue

buffer.append(f"IP: {ip} (as sender or receiver. aggregated)")
buffer.append(f" Total transmitted data: {data['total_bytes']} bytes")
buffer.append(' Ports:')
for port, port_data in data['ports'].items():
buffer.append(f" {port}: {port_data['packets']} packets, {port_data['bytes']} bytes")
buffer.append('\n')

return '\n'.join(buffer)
25 changes: 25 additions & 0 deletions metric_providers/network/connections/tcpdump/system/tcpdump.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#! /bin/bash
set -euo pipefail

check_system=false
while getopts "c" o; do
case "$o" in
c)
check_system=true
;;
esac
done


if $check_system; then
# This will try to capture one packet only. However since no network traffic might be happening we also limit to 5 seconds
first_line=$(timeout 3 tcpdump -tt --micro -n -v -c 1)
# timeout will raise error code 124
if [ $? -eq 1 ]; then
echo "tcpdump could not be started. Missing sudo permissions?"
exit 1
fi
exit 0
fi

tcpdump -tt --micro -n -v
24 changes: 16 additions & 8 deletions runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,12 +1041,19 @@ def start_metric_providers(self, allow_container=True, allow_other=True):

print(TerminalColors.HEADER, '\nStarting metric providers', TerminalColors.ENDC)

# Here we start all container related providers
# This includes tcpdump, which is only for debugging of the containers itself
# If debugging of the tool itself is wanted tcpdump should be started adjacent to the tool and not inline
for metric_provider in self.__metric_providers:
if metric_provider._metric_name.endswith('_container') and not allow_container:
if (metric_provider._metric_name.endswith('_container') or metric_provider._metric_name == 'network_connections_tcpdump_system' ) and not allow_container:
continue
if not metric_provider._metric_name.endswith('_container') and not allow_other:

if not metric_provider._metric_name.endswith('_container') and metric_provider._metric_name != 'network_connections_tcpdump_system' and not allow_other:
continue

if metric_provider.has_started():
raise RuntimeError(f"Metric provider {metric_provider.__class__.__name__} was already started!")

message = f"Booting {metric_provider.__class__.__name__}"
metric_provider.start_profiling(self.__containers)
if self._verbose_provider_boot:
Expand All @@ -1058,9 +1065,10 @@ def start_metric_providers(self, allow_container=True, allow_other=True):
self.custom_sleep(2)

for metric_provider in self.__metric_providers:
if metric_provider._metric_name.endswith('_container') and not allow_container:
if (metric_provider._metric_name.endswith('_container') or metric_provider._metric_name == 'network_connections_tcpdump_system' ) and not allow_container:
continue
if not metric_provider._metric_name.endswith('_container') and not allow_other:

if not metric_provider._metric_name.endswith('_container') and metric_provider._metric_name != 'network_connections_tcpdump_system' and not allow_other:
continue

stderr_read = metric_provider.get_stderr()
Expand Down Expand Up @@ -1444,7 +1452,7 @@ def save_stdout_logs(self):
if logs_as_str:
DB().query("""
UPDATE runs
SET logs=%s
SET logs = COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(logs_as_str, self._run_id))

Expand Down Expand Up @@ -1617,21 +1625,21 @@ def run(self):
raise exc
finally:
try:
self.read_and_cleanup_processes()
self.stop_metric_providers()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
self.set_run_failed()
raise exc
finally:
try:
self.save_notes_runner()
self.read_and_cleanup_processes()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
self.set_run_failed()
raise exc
finally:
try:
self.stop_metric_providers()
self.save_notes_runner()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
self.set_run_failed()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ def run_until(self, step):
self.__runner.update_start_and_end_times()
self.__runner.store_phases()
self.__runner.read_container_logs()
self.__runner.stop_metric_providers()
self.__runner.read_and_cleanup_processes()
self.__runner.save_notes_runner()
self.__runner.stop_metric_providers()
self.__runner.save_stdout_logs()

if self.__runner._dev_no_phase_stats is False:
Expand Down

0 comments on commit c085073

Please sign in to comment.