Skip to content

Commit

Permalink
Multiproc to async and target macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
aBozowski committed Oct 26, 2023
1 parent 44a674f commit 36f3ceb
Show file tree
Hide file tree
Showing 19 changed files with 97 additions and 124 deletions.
10 changes: 1 addition & 9 deletions src/tools/interop/idt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ usage: idt capture [-h] [--platform {Android}] [--ecosystem {DemoExtEcosystem...
```
> **IMPORTANT:** Note the following runtime expectations of ecosystems:
> `analyze_capture` will be run as a target of `multiprocessing.Process`, \
> meaning the ecosystem object will be copied into a forked process at this time.
> `analyze_capture` must be async aware and not interact with stdin
The platform loader functions the same as `capture/ecosystem`.
Expand Down Expand Up @@ -335,13 +334,6 @@ For discovery:
- When needed, execute builds in a folder called `BUILD` within the source
tree.
- `idt_clean_all` deletes all `BUILD` dirs and `BUILD` is in `.gitignore`.
- Although many things are marked as co routines, almost all real concurrency
in the current implementation comes from multiprocessing.
- A general direction should be decided for the project in the next
iteration.
- Multiprocessing allows for easier implementation where ecosystems are
less likely to block each other
- Async allows for better shared states and flexibility
## Troubleshooting
Expand Down
4 changes: 2 additions & 2 deletions src/tools/interop/idt/capture/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ async def stop_capture(self) -> None:
raise NotImplementedError

@abstractmethod
def analyze_capture(self) -> None:
async def analyze_capture(self) -> None:
"""
Parse the capture and create + display helpful analysis artifacts that are unique to the ecosystem
IMPORTANT: This function will be run as a separate process to allow real time analysis!!!
Must be async aware and not interact with stdin
"""
raise NotImplementedError

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# limitations under the License.
#

import asyncio
import json
import os
import time
from typing import Dict

import log
Expand All @@ -26,7 +26,7 @@
from capture.utils.artifact import create_standard_log_name

from . import config
from .analysis import PlayServicesAnalysis
from .play_services_analysis import PlayServicesAnalysis
from .command_map import dumpsys, getprop
from .prober import PlayServicesProber

Expand All @@ -39,6 +39,7 @@ class PlayServices(EcosystemCapture):
"""

def __init__(self, platform: Android, artifact_dir: str) -> None:
self.logcat_fd = None
self.logger = logger
self.artifact_dir = artifact_dir

Expand Down Expand Up @@ -103,13 +104,14 @@ async def start_capture(self) -> None:

async def stop_capture(self) -> None:
self.analysis.show_analysis()
if self.logcat_fd is not None:
self.logcat_fd.close()

def analyze_capture(self):
fd = open(self.platform.streams["LogcatStreamer"].logcat_artifact, "r")
async def analyze_capture(self):
self.logcat_fd = open(self.platform.streams["LogcatStreamer"].logcat_artifact, "r")
while True:
self.analysis.do_analysis(fd.readlines())
time.sleep(4)
fd.close()
self.analysis.do_analysis(self.logcat_fd.readlines())
await asyncio.sleep(0)

async def probe_capture(self) -> None:
if config.enable_foyer_probers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ def _log_proc_sigma(self, line: str) -> None:
self.sigma_logs += line

def show_analysis(self) -> None:
# TODO: Because the realtime proc is forked, have to do this again for now...
self.real_time = False
with open(self.platform.streams["LogcatStreamer"].logcat_artifact, mode='r') as logcat_file:
for line in logcat_file:
self.process_line(line)
analysis_file = open(self.analysis_file_name, mode="w+")
print_and_write(add_border('Matter commissioner logs'), analysis_file)
print_and_write(self.matter_commissioner_logs, analysis_file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
class PlayServicesProber:

def __init__(self, platform):
# TODO: Platform independent traceroute
self.platform = platform
self.artifact_dir = self.platform.artifact_dir
self.logger = logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@
# limitations under the License.
#

import asyncio
import os

from capture.base import EcosystemCapture, UnsupportedCapturePlatformException
from capture.platform.android.android import Android
from capture.utils.artifact import create_standard_log_name
from log import print_and_write

from log import get_logger

logger = get_logger(__file__)


class PlayServicesUser(EcosystemCapture):
"""
Implementation of capture and analysis for Play Services 3P
"""

def __init__(self, platform: Android, artifact_dir: str) -> None:

self.logger = logger
self.artifact_dir = artifact_dir
self.analysis_file = os.path.join(
self.artifact_dir, create_standard_log_name(
Expand All @@ -40,43 +45,42 @@ def __init__(self, platform: Android, artifact_dir: str) -> None:
'only platform=android is supported for '
'ecosystem=PlayServicesUser')
self.platform = platform
self.logcat_fd = None
self.output = ""

async def start_capture(self) -> None:
pass

async def stop_capture(self) -> None:
if self.logcat_fd is not None:
self.logcat_fd.close()
self.show_analysis()

@staticmethod
def proc_line(line, analysis_file) -> None:
def proc_line(self, line) -> None:
if "CommissioningServiceBin: Binding to service" in line:
print_and_write(
f"3P commissioner initiated Play Services commissioning\n{line}",
analysis_file)
s = f"3P commissioner initiated Play Services commissioning\n{line}"
logger.info(s)
self.output += f"{s}\n"
elif "CommissioningServiceBin: Sending commissioning request to bound service" in line:
print_and_write(
f"Play Services commissioning complete; passing back to 3P\n{line}",
analysis_file)
s = f"Play Services commissioning complete; passing back to 3P\n{line}"
logger.info(s)
self.output += f"{s}\n"
elif "CommissioningServiceBin: Received commissioning complete from bound service" in line:
print_and_write(
f"3P commissioning complete!\n{line}",
analysis_file)
s = f"3P commissioning complete!\n{line}"
logger.info(s)
self.output += f"{s}\n"

def analyze_capture(self) -> None:
async def analyze_capture(self) -> None:
""""Show the start and end times of commissioning boundaries"""
fd = open(self.platform.streams["LogcatStreamer"].logcat_artifact, "r")
self.logcat_fd = open(self.platform.streams["LogcatStreamer"].logcat_artifact, "r")
while True:
for line in fd.readlines():
self.proc_line(line, open("/dev/null", mode="w"))
fd.close()
for line in self.logcat_fd.readlines():
self.proc_line(line)
await asyncio.sleep(0)

def show_analysis(self) -> None:
# TODO: Make more elegant
analysis_file = open(self.analysis_file, mode='w+')
with open(self.platform.streams["LogcatStreamer"].logcat_artifact, mode='r') as logcat_file:
for line in logcat_file:
self.proc_line(line, analysis_file)
analysis_file.close()
with open(self.analysis_file, "w") as analysis_file:
print_and_write(self.output, analysis_file)

async def probe_capture(self) -> None:
pass
24 changes: 13 additions & 11 deletions src/tools/interop/idt/capture/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

import asyncio
import copy
import multiprocessing
import os
import sys
import traceback
import typing
from multiprocessing import Process

import capture
import log
Expand All @@ -33,7 +32,6 @@
_PLATFORM_MAP: typing.Dict[str, PlatformLogStreamer] = {}
_ECOSYSTEM_MAP: typing.Dict[str, PlatformLogStreamer] = {}
_ERROR_REPORT: typing.Dict[str, list[(str, str, str)]] = {}
_ANALYSIS_MAP: typing.Dict[str, Process] = {}

logger = log.get_logger(__file__)

Expand Down Expand Up @@ -138,21 +136,25 @@ async def start():

@staticmethod
async def stop():
for ecosystem_name, ecosystem in _ANALYSIS_MAP.items():
border_print(f"Stopping analysis proc for {ecosystem_name}")
ecosystem.kill()
for platform_name, platform, in _PLATFORM_MAP.items():
border_print(f"Stopping streaming for platform {platform_name}")
await platform.stop_streaming()
await EcosystemController.handle_capture("stop")

@staticmethod
def analyze():
async def run_analyzers():
border_print("Starting real time analysis, press enter to stop!", important=True)
tasks = []
for ecosystem_name, ecosystem in _ECOSYSTEM_MAP.items():
border_print(f"Starting analyze subproc for {ecosystem_name}")
proc = multiprocessing.Process(target=ecosystem.analyze_capture)
_ANALYSIS_MAP[ecosystem_name] = proc
proc.start()
logger.info(f"Creating analysis task for {ecosystem_name}")
tasks.append(asyncio.create_task(ecosystem.analyze_capture()))
logger.info("Done creating analysis tasks")
await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline)
border_print("Cancelling analysis tasks")
for task in tasks:
task.cancel()
logger.info("Done cancelling analysis tasks")

@staticmethod
async def probe():
Expand Down
2 changes: 1 addition & 1 deletion src/tools/interop/idt/capture/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def is_package(potential_package: str) -> bool:
return os.path.exists(init_path)

def verify_coroutines(self, subclass) -> bool:
# ABC does not verify coroutines on subclass instantiation, it merely checks the presence of instance methods
# ABC does not verify coroutines on subclass instantiation, it merely checks the presence of methods
for item in dir(self.search_type):
item_attr = getattr(self.search_type, item)
if inspect.iscoroutinefunction(item_attr):
Expand Down
1 change: 1 addition & 0 deletions src/tools/interop/idt/capture/pcap/pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ def start_pcap(self) -> None:
self.logger.info(f"Pcap output path {self.output_path}")

def stop_pcap(self) -> None:
# TODO: Broken on macOS
self.logger.info("Stopping pcap proc")
self.pcap_proc.stop_command(soft=True)
1 change: 1 addition & 0 deletions src/tools/interop/idt/capture/platform/android/android.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def _device_id_user_input(self) -> None:
self._log_adb_devices()
temp_device_id = input('').strip()
self._check_connect_wireless_adb(temp_device_id)
self.get_adb_devices()
if self._only_one_device_connected():
self._set_device_if_only_one_connected()
elif temp_device_id not in self.adb_devices:
Expand Down
3 changes: 2 additions & 1 deletion src/tools/interop/idt/capture/platform/android/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
#

enable_build_push_tcpdump = True
enable_bug_report = True
# TODO: Re-enable before un-draft
enable_bug_report = False
hci_log_level = "full"
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
# limitations under the License.
#

import multiprocessing
import time
from typing import TYPE_CHECKING

import log
from capture.utils.artifact import create_standard_log_name, get_observer_proc
from capture.utils.artifact import create_standard_log_name

from ..base import AndroidStream

Expand All @@ -38,29 +36,15 @@ def __init__(self, platform: "Android"):
self.logcat_artifact = create_standard_log_name("logcat", "txt", parent=platform.artifact_dir)
self.logcat_command = f"logcat -T 1 >> {self.logcat_artifact}"
self.logcat_proc = platform.get_adb_background_command(self.logcat_command)
self.observer_proc = get_observer_proc(self.logcat_artifact)
self.runner_proc = multiprocessing.Process(target=self.restart_logcat_as_needed)
self.was_ever_running = False

def restart_logcat_as_needed(self) -> None:
while True:
if not self.logcat_proc.command_is_running():
self.logcat_proc = self.platform.get_adb_background_command(self.logcat_command)
self.logcat_proc.start_command()
if self.was_ever_running:
self.logger.critical("Had to start logcat again!!!")
else:
self.was_ever_running = True
time.sleep(10)
async def start_observer(self):
# TODO: async restart logcat as needed
# TODO: async warn if file not growing as needed
pass

async def start(self):
if not self.runner_proc.is_alive():
self.runner_proc.start()
if not self.observer_proc.is_alive():
self.observer_proc.start()
self.logcat_proc.start_command()

async def stop(self):
self.runner_proc.kill()
self.observer_proc.kill()
self.runner_proc.join()
self.observer_proc.join()
self.logcat_proc.stop_command()
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def start(self):
self.pcap_pull = True

async def stop(self):
# TODO: Broken on macOS
self.logger.info("Stopping android pcap proc")
self.pcap_proc.stop_command()
await self.pull_packet_capture()
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def check_screen(self) -> bool:
async def prepare_screen_recording(self) -> None:
screen_on = self.check_screen()
while not screen_on:
await asyncio.sleep(4)
await asyncio.sleep(3)
screen_on = self.check_screen()
if not screen_on:
self.logger.error("Please turn the screen on so screen recording can start or check connection!")
Expand All @@ -72,19 +72,14 @@ def update_commands(self) -> None:
manifest.write(screen_pull_command)
self.file_counter += 1

def run_recorder(self) -> None:
while True:
asyncio.run(self.prepare_screen_recording())
self.update_commands()
self.logger.info(f"New screen recording file started {self.screen_phone_out_path} {self.screen_artifact}")
self.platform.run_adb_command(self.screen_command)

async def start(self):
await self.prepare_screen_recording()
if self.check_screen() and not self.screen_pull:
self.screen_pull = True
self.screen_proc = multiprocessing.Process(target=self.run_recorder)
self.screen_proc.start()
# TODO: Make screen recording run on loop, replace removed multiproc solution with async
self.update_commands()
self.logger.info(f"New screen recording file started {self.screen_phone_out_path} {self.screen_artifact}")
self.screen_proc = self.platform.get_adb_background_command(self.screen_command)
self.screen_proc.start_command()

async def pull_screen_recording(self) -> None:
if self.screen_pull:
Expand All @@ -96,8 +91,7 @@ async def pull_screen_recording(self) -> None:
self.screen_pull = False

async def stop(self):
# TODO: Broken on macOS
self.logger.info("Stopping screen proc")
if self.screen_proc is not None:
self.screen_proc.kill()
self.screen_proc.join()
self.screen_proc.stop_command()
await self.pull_screen_recording()
Loading

0 comments on commit 36f3ceb

Please sign in to comment.