Skip to content

Commit

Permalink
[harness-simulation] keep capture files in sync at runtime (#8138)
Browse files Browse the repository at this point in the history
This commit keeps capture files in sync at runtime. Therefore, Harness
can obtain the addresses directly from the capture files in manual DUT
mode without having to enter the addresses manually.
  • Loading branch information
jcdong98 authored Feb 15, 2023
1 parent 9ff2eef commit 242c7cc
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import select
import socket
import struct
import threading
import time
import win32api
import winreg as wr
Expand Down Expand Up @@ -259,8 +260,19 @@ def startSniffer(self, channelToCapture, captureFileLocation, includeEthernet=Fa
if response.status != sniffer_pb2.OK:
raise RuntimeError('startSniffer error: %s' % sniffer_pb2.Status.Name(response.status))

self._thread = threading.Thread(target=self._file_sync_main_loop)
self._thread.setDaemon(True)
self._thread.start()

self.is_active = True

@watched
def _file_sync_main_loop(self):
with open(self._local_pcapng_location, 'wb') as f:
for response in self._stub.TransferPcapng(sniffer_pb2.TransferPcapngRequest()):
f.write(response.content)
f.flush()

@watched
def stopSniffer(self):
if not self.is_active:
Expand All @@ -270,8 +282,7 @@ def stopSniffer(self):
if response.status != sniffer_pb2.OK:
raise RuntimeError('stopSniffer error: %s' % sniffer_pb2.Status.Name(response.status))

with open(self._local_pcapng_location, 'wb') as f:
f.write(response.pcap_content)
self._thread.join()

self.is_active = False

Expand Down
11 changes: 10 additions & 1 deletion tools/harness-simulation/posix/sniffer_sim/proto/sniffer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ service Sniffer {
// Start the sniffer
rpc Start(StartRequest) returns (StartResponse) {}

// Transfer the capture file
rpc TransferPcapng(TransferPcapngRequest) returns (stream TransferPcapngResponse) {}

// Let the sniffer sniff these nodes only
rpc FilterNodes(FilterNodesRequest) returns (FilterNodesResponse) {}

Expand Down Expand Up @@ -41,6 +44,13 @@ message StartResponse {
Status status = 1;
}

message TransferPcapngRequest {
}

message TransferPcapngResponse {
bytes content = 1;
}

message FilterNodesRequest {
repeated int32 nodeids = 1;
}
Expand All @@ -54,5 +64,4 @@ message StopRequest {

message StopResponse {
Status status = 1;
bytes pcap_content = 2;
}
67 changes: 51 additions & 16 deletions tools/harness-simulation/posix/sniffer_sim/sniffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import argparse
from concurrent import futures
import enum
import fcntl
import grpc
import logging
import os
Expand All @@ -38,6 +39,7 @@
import subprocess
import tempfile
import threading
import time

import pcap_codec
from proto import sniffer_pb2
Expand Down Expand Up @@ -66,13 +68,14 @@ def _reset(self):
self._transport = None
self._thread = None
self._thread_alive.clear()
self._pcapng_filename = None
self._file_sync_done.clear()
self._tshark_proc = None

def __init__(self, max_nodes_num):
self._max_nodes_num = max_nodes_num
self._thread_alive = threading.Event()
self._mutex = threading.Lock() # for self._denied_nodeids
self._file_sync_done = threading.Event()
self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids`
self._reset()

def Start(self, request, context):
Expand All @@ -94,11 +97,11 @@ def Start(self, request, context):
if request.includeEthernet:
self._state |= CaptureState.ETHERNET
cmd += ['-i', 'docker0']
self._pcapng_filename = os.path.join(tempdir, 'sim.pcapng')
cmd += ['-w', self._pcapng_filename, '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']
cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899']

self.logger.debug('Running command: %s', ' '.join(cmd))
self._tshark_proc = subprocess.Popen(cmd)
self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self._set_nonblocking(self._tshark_proc.stdout.fileno())

# Construct pcap codec after initiating tshark to avoid blocking
self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name)
Expand All @@ -112,7 +115,7 @@ def Start(self, request, context):

# Start the sniffer main loop thread
self._thread = threading.Thread(target=self._sniffer_main_loop)
self._thread.daemon = True
self._thread.setDaemon(True)
self._transport.open()
self._thread_alive.set()
self._thread.start()
Expand All @@ -128,13 +131,44 @@ def _sniffer_main_loop(self):
except socket.timeout:
continue

with self._mutex:
with self._nodeids_mutex:
denied_nodeids = self._denied_nodeids

# Equivalent to RF enclosure
if nodeid not in denied_nodeids:
self._pcap.append(data)

def TransferPcapng(self, request, context):
""" Transfer the capture file. """

# Validate the state
if self._state == CaptureState.NONE:
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR)

# Synchronize the capture file
while True:
content = self._tshark_proc.stdout.read()
if content is None:
# Currently no captured packets
time.sleep(self.TIMEOUT)
elif content == b'':
# Reach EOF when tshark terminates
break
else:
# Forward the captured packets
yield sniffer_pb2.TransferPcapngResponse(content=content)

self._file_sync_done.set()

def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
if flags < 0:
raise RuntimeError('fcntl(F_GETFL) failed')

flags |= os.O_NONBLOCK
if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0:
raise RuntimeError('fcntl(F_SETFL) failed')

def FilterNodes(self, request, context):
""" Only sniffer the specified nodes. """

Expand All @@ -150,7 +184,7 @@ def FilterNodes(self, request, context):
if not 1 <= nodeid <= self._max_nodes_num:
return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR)

with self._mutex:
with self._nodeids_mutex:
self._denied_nodeids = denied_nodeids

return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK)
Expand All @@ -161,28 +195,29 @@ def Stop(self, request, context):
self.logger.debug('call Stop')

# Validate and change the state
if self._state == CaptureState.NONE:
return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR, pcap_content=b'')
if not (self._state & CaptureState.THREAD):
return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR)
self._state = CaptureState.NONE

self._thread_alive.clear()
self._thread.join(timeout=1)
self._thread.join()
self._transport.close()
self._pcap.close()

self._tshark_proc.terminate()
self._file_sync_done.wait()
# `self._tshark_proc` becomes None after the next statement
self._tshark_proc.wait()

with open(self._pcapng_filename, 'rb') as f:
pcap_content = f.read()

self._reset()

return sniffer_pb2.StopResponse(status=sniffer_pb2.OK, pcap_content=pcap_content)
return sniffer_pb2.StopResponse(status=sniffer_pb2.OK)


def serve(address_port, max_nodes_num):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
# One worker is used for `Start`, `FilterNodes` and `Stop`
# The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server)
# add_secure_port requires a web domain
server.add_insecure_port(address_port)
Expand Down

0 comments on commit 242c7cc

Please sign in to comment.