Skip to content

Commit

Permalink
master and worker check
Browse files Browse the repository at this point in the history
  • Loading branch information
horta committed Dec 6, 2024
1 parent 880311f commit 1b33e7c
Show file tree
Hide file tree
Showing 9 changed files with 1,958 additions and 24 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/test-h3daemon.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Test h3daemon

on:
push:
branches:
- "**"
paths:
- h3daemon/**
- .github/workflows/test-h3daemon.yml

defaults:
run:
working-directory: h3daemon

jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]

steps:
- uses: actions/checkout@v4

- name: Run tests
run: |
pipx run poetry install
pipx run poetry run pytest -n 30
40 changes: 20 additions & 20 deletions h3daemon/h3daemon/daemon.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os
import platform
import traceback
from contextlib import contextmanager, suppress

import psutil
from deciphon_schema import HMMFile
from pidlockfile import PIDLockFile

from h3daemon.debug import debug_exception, debug_message
from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.errors import ChildNotFoundError, PIDNotFoundError
from h3daemon.healthy import assert_peers_healthy
Expand All @@ -31,25 +30,13 @@ def shutdown(x: psutil.Process, force: bool):
shutdown(x, True)


def debug_exception(exception: Exception):
if os.environ.get("H3DAEMON_DEBUG", 0):
with open("h3daemon_debug.txt", "a") as f:
f.write(f"{type(exception)}\n")
f.write(f"{exception.args}\n")
f.write(traceback.format_exc())


def debug_msg(msg: str):
if os.environ.get("H3DAEMON_DEBUG", 0):
with open("h3daemon_debug.txt", "a") as f:
f.write(f"{msg}\n")


class Daemon:
def __init__(self, master: Master, worker: Worker, process: psutil.Process | None):
self._master = master
self._worker = worker
self._process = process
if process is not None:
debug_message(f"Daemon.__init__ PID: {process.pid}")

@classmethod
def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):
Expand All @@ -59,6 +46,8 @@ def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):
try:
cport = find_free_port() if cport == 0 else cport
wport = find_free_port() if wport == 0 else wport
debug_message(f"Daemon.spawn cport: {cport}")
debug_message(f"Daemon.spawn wport: {wport}")

cmd = Master.cmd(cport, wport, str(hmmfile.path))
master = Master(psutil.Popen(cmd))
Expand All @@ -70,6 +59,7 @@ def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):

if platform.system() != "Darwin":
assert_peers_healthy(master, worker)
debug_message("Daemon.spawn is ready")
except Exception as exception:
debug_exception(exception)
if worker:
Expand All @@ -90,16 +80,22 @@ def possess(cls, pidfile: PIDLockFile):
process = psutil.Process(pid)
children = process.children()

if len(children) > 0:
master = Master(children[0])
masters = [x for x in children if Master.myself(x)]
workers = [x for x in children if Worker.myself(x)]

if len(masters) > 0:
assert len(masters) == 1
master = Master(masters[0])
else:
raise ChildNotFoundError("Master not found.")

if len(children) > 1:
worker = Worker(children[1])
if len(workers) > 0:
assert len(workers) == 1
worker = Worker(workers[0])
else:
raise ChildNotFoundError("Worker not found.")

debug_message("Daemon.possess finished")
return cls(master, worker, process)

def shutdown(self, force=False):
Expand All @@ -112,6 +108,8 @@ def shutdown(self, force=False):
if self._process is not None:
shutdown(self._process, force)

debug_message("Daemon.shutdown finished")

@polling
def wait_for_readiness(self):
assert self.healthy
Expand All @@ -130,6 +128,8 @@ def port(self) -> int:
self.wait_for_readiness()
master_ports = set(self._master.local_listening_ports())
worker_ports = list(set(self._worker.remote_established_ports()))
debug_message(f"Daemon.port master_ports: {master_ports}")
debug_message(f"Daemon.port worker_ports: {worker_ports}")
if len(worker_ports) != 1:
raise RuntimeError(
f"Expected one remote port ({worker_ports}). Worker might have died."
Expand Down
35 changes: 35 additions & 0 deletions h3daemon/h3daemon/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import traceback

__all__ = ["debug_exception", "debug_message"]

_should_debug: bool | None = None
_debug_file: str | None = None


def should_debug():
global _should_debug
if _should_debug is None:
_should_debug = bool(os.environ.get("H3DAEMON_DEBUG", 0))
return _should_debug


def debug_file():
global _debug_file
if _debug_file is None:
_debug_file = os.environ.get("H3DAEMON_DEBUG_FILE", "h3daemon_debug.txt")
return _debug_file


def debug_exception(exception: Exception):
if should_debug():
with open(debug_file(), "a") as f:
f.write(f"{type(exception)}\n")
f.write(f"{exception.args}\n")
f.write(traceback.format_exc())


def debug_message(msg: str):
if should_debug():
with open(debug_file(), "a") as f:
f.write(f"{msg}\n")
14 changes: 12 additions & 2 deletions h3daemon/h3daemon/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
import hmmer
import psutil

from h3daemon.debug import debug_message
from h3daemon.polling import Polling
from h3daemon.tcp import tcp_connections

__all__ = ["Master"]


class Master:
def __init__(self, proc: psutil.Process):
self._proc = proc
def __init__(self, process: psutil.Process):
self._proc = process
debug_message(f"Worker.__init__ PID: {process.pid}")

@staticmethod
def myself(process: psutil.Process):
return "--master" in process.cmdline()

@property
def process(self):
Expand All @@ -27,6 +33,7 @@ def healthy(self):
if not self._proc.is_running():
return False
lports = self.local_listening_ports()
debug_message(f"Master.healthy lports: {lports}")
return len(lports) > 1

def wait_for_readiness(self):
Expand All @@ -36,15 +43,18 @@ def wait_for_readiness(self):

def local_listening_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.local_listening_ports connections: {connections}")
connections = [x for x in connections if x.status == "LISTEN"]
return [x.laddr.port for x in connections if x.laddr.ip == "0.0.0.0"]

def local_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.local_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.laddr.port for x in connections if x.laddr.ip == "127.0.0.1"]

def remote_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.remote_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.raddr.port for x in connections if x.raddr.ip == "127.0.0.1"]
14 changes: 12 additions & 2 deletions h3daemon/h3daemon/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
import hmmer
import psutil

from h3daemon.debug import debug_message
from h3daemon.polling import Polling
from h3daemon.tcp import tcp_connections

__all__ = ["Worker"]


class Worker:
def __init__(self, proc: psutil.Process):
self._proc = proc
def __init__(self, process: psutil.Process):
self._proc = process
debug_message(f"Worker.__init__ PID: {process.pid}")

@staticmethod
def myself(process: psutil.Process):
return "--worker" in process.cmdline()

@property
def process(self):
Expand All @@ -28,6 +34,8 @@ def healthy(self):
try:
lports = self.local_established_ports()
rports = self.remote_established_ports()
debug_message(f"Worker.healthy lports: {lports}")
debug_message(f"Worker.healthy rports: {rports}")
except psutil.ZombieProcess:
# psutil bug: https://github.com/giampaolo/psutil/issues/2116
return True
Expand All @@ -40,10 +48,12 @@ def wait_for_readiness(self):

def local_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Worker.local_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.laddr.port for x in connections if x.laddr.ip == "127.0.0.1"]

def remote_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Worker.remote_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.raddr.port for x in connections if x.raddr.ip == "127.0.0.1"]
5 changes: 5 additions & 0 deletions h3daemon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ tenacity = "^9.0.0"
[tool.poetry.extras]
cli = ["typer"]

[tool.poetry.group.dev.dependencies]
pytest-repeat = "^0.9.3"
pytest = "^8.3.4"
pytest-xdist = "^3.6.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Expand Down
8 changes: 8 additions & 0 deletions h3daemon/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from pathlib import Path

import pytest


@pytest.fixture
def files_path() -> Path:
return Path(__file__).parent / "files"
Loading

0 comments on commit 1b33e7c

Please sign in to comment.