Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: increase coverage and add test gh workflow #8

Merged
merged 12 commits into from
Dec 27, 2023
36 changes: 36 additions & 0 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Test Suite

on:
push:
branches:
- "main"
pull_request:
branches:
- "main"

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

jobs:
tests:
name: "Python ${{ matrix.python-version }} ${{ matrix.os }}"
runs-on: "${{ matrix.os }}"
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
os: [windows-latest, ubuntu-latest, macos-latest]
steps:
- uses: "actions/checkout@v4"
- uses: "actions/setup-python@v5"
with:
python-version: "${{ matrix.python-version }}"
- name: "Install dependencies"
run: |
python -m pip install --upgrade pip
python -m pip install nox
python --version
pip --version
nox --version
- name: "Run tests"
run: |
nox -s test_for_ci
144 changes: 87 additions & 57 deletions flexexecutor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import atexit
import itertools
from concurrent.futures import ProcessPoolExecutor, _base
from concurrent.futures import thread as _thread
from concurrent.futures import Future, ProcessPoolExecutor, _base
from concurrent.futures.thread import BrokenThreadPool, _WorkItem
from concurrent.futures.thread import ThreadPoolExecutor as _ThreadPoolExecutor
from inspect import iscoroutinefunction
from queue import Empty
from threading import Event, Lock, Thread
Expand All @@ -12,6 +13,8 @@
__all__ = (
"__version__",
"AsyncPoolExecutor",
"BrokenThreadPool",
"Future",
"ProcessPoolExecutor",
"ThreadPoolExecutor",
)
Expand Down Expand Up @@ -44,17 +47,16 @@ def _worker(executor_ref, work_queue, initializer, initargs, idle_timeout):
except BaseException:
_base.LOGGER.critical("Exception in initializer:", exc_info=True)
executor = executor_ref()
if executor is not None:
if executor is not None: # pragma: no cover
executor._initializer_failed()
return

idle_tick = monotonic()
idle_tick = -1.0
try:
while True:
if idle_timeout >= 0 and monotonic() - idle_tick > idle_timeout:
executor = executor_ref()
if executor is not None:
executor._idle_semaphore.acquire(timeout=0)
if idle_tick == -1.0:
idle_tick = monotonic()
elif idle_timeout >= 0 and monotonic() - idle_tick > idle_timeout:
break
try:
work_item = work_queue.get(block=True, timeout=0.1)
Expand All @@ -70,18 +72,20 @@ def _worker(executor_ref, work_queue, initializer, initargs, idle_timeout):
del executor
idle_tick = monotonic()
continue
executor = executor_ref()
if _thread._shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True
break # pragma: no cover
finally:
executor = executor_ref()
if executor is None:
work_queue.put(None)
else:
executor._idle_semaphore.acquire(timeout=0)
if _shutdown or executor._shutdown:
executor._shutdown = True
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical("Exception in worker", exc_info=True)
del executor


class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
class ThreadPoolExecutor(_ThreadPoolExecutor):
def __init__(
self,
max_workers=None,
Expand All @@ -90,12 +94,39 @@ def __init__(
initargs=(),
idle_timeout=60.0,
):
if max_workers is None:
max_workers = 1024
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
if idle_timeout is None or idle_timeout < 0:
self._idle_timeout = -1
else:
self._idle_timeout = max(0.1, idle_timeout)

def submit(self, fn, /, *args, **kwargs):
if iscoroutinefunction(fn):
raise TypeError("fn must not be a coroutine function")

with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)

if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
if _shutdown:
# coverage didn't realize that _shutdown is set, add no cover here
raise RuntimeError( # pragma: no cover
"cannot schedule new futures after interpreter shutdown"
)

f = Future() # type: ignore
w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
return f

submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return
Expand Down Expand Up @@ -125,9 +156,10 @@ def weakref_cb(_, q=self._work_queue):
_threads_queues[t] = self._work_queue


class _AsyncWorkItem(_thread._WorkItem):
class _AsyncWorkItem(_WorkItem):
async def run(self):
if not self.future.set_running_or_notify_cancel():
print("cancelled")
return

try:
Expand Down Expand Up @@ -156,7 +188,7 @@ async def _async_worker(
initializer(*initargs)
except BaseException:
_base.LOGGER.critical("Exception in initializer:", exc_info=True)
if executor is not None:
if executor is not None: # pragma: no cover
executor._running.set()
executor._initializer_failed()
return
Expand All @@ -169,43 +201,40 @@ async def _async_worker(
try:
while True:
if idle_timeout >= 0 and monotonic() - idle_tick > idle_timeout:
executor = executor_ref()
if executor is not None:
executor._idle_semaphore.acquire(timeout=0)
break
try:
work_item = work_queue.get(block=True, timeout=0.1)
except Empty:
pass
if work_item is not None:
task = loop.create_task(work_item.run())
curr_tasks.add(task)
await asleep(0) # ugly but working
del work_item

finished_tasks = [t for t in curr_tasks if t.done()]
for t in finished_tasks:
curr_tasks.remove(t)
if curr_tasks:
idle_tick = monotonic()
continue

executor = executor_ref()
if _thread._shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True
work_queue.put(None)

for t in curr_tasks:
await t
return
del executor
except BaseException:
_base.LOGGER.critical("Exception in worker", exc_info=True)
if len(curr_tasks) < max_workers:
try:
work_item = work_queue.get(block=True, timeout=0.1)
if work_item is not None:
task = loop.create_task(work_item.run())
curr_tasks.add(task)
await asleep(0)
del work_item
else:
break
except Empty:
pass
await asleep(0)
finished_tasks = [t for t in curr_tasks if t.done()]
for t in finished_tasks:
curr_tasks.remove(t)
if curr_tasks:
idle_tick = monotonic()
finally:
while curr_tasks:
await asleep(0)
finished_tasks = [t for t in curr_tasks if t.done()]
for t in finished_tasks:
curr_tasks.remove(t)
executor = executor_ref()
if executor is not None:
if executor is None:
work_queue.put(None)
else:
executor._running.clear()
if _shutdown or executor._shutdown:
executor._shutdown = True
work_queue.put(None)
del executor


class AsyncWorker(Thread):
Expand Down Expand Up @@ -241,7 +270,7 @@ def run(self):
)


class AsyncPoolExecutor(_thread.ThreadPoolExecutor):
class AsyncPoolExecutor(ThreadPoolExecutor):
_counter = itertools.count().__next__

def __init__(
Expand Down Expand Up @@ -269,16 +298,17 @@ def submit(self, fn, /, *args, **kwargs):
raise TypeError("fn must be a coroutine function")
with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise _thread.BrokenThreadPool(self._broken)
raise BrokenThreadPool(self._broken)

if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
if _thread._shutdown:
raise RuntimeError(
if _shutdown:
# coverage didn't realize that _shutdown is set, add no cover here
raise RuntimeError( # pragma: no cover
"cannot schedule new futures after interpreter shutdown"
)

f = _base.Future() # type: ignore
f = Future() # type: ignore
w = _AsyncWorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
Expand Down
51 changes: 26 additions & 25 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

PYTHON_BASE_VERSION = "3.8"
AUTOFLAKE_VERSION = "2.2.1"
ISORT_VERSION = "5.13.2"
RUFF_VERSION = "0.1.9"
MYPY_VERSION = "1.8.0"
BUILD_VERSION = "1.0.3"
Expand Down Expand Up @@ -59,17 +58,10 @@ def clean(session: Session):

@nox.session(python="3.8", reuse_venv=True)
@nox.parametrize("autoflake", [AUTOFLAKE_VERSION])
@nox.parametrize("isort", [ISORT_VERSION])
@nox.parametrize("ruff", [RUFF_VERSION])
def format(
session: Session,
autoflake: str,
isort: str,
ruff: str,
):
def format(session: Session, autoflake: str, ruff: str):
session.install(
f"autoflake~={autoflake}",
f"isort~={isort}",
f"ruff~={ruff}",
)
try:
Expand All @@ -82,25 +74,16 @@ def format(
)
session.run("autoflake", "--version")
session.run("autoflake", SOURCE_PATH, NOXFILE_PATH, TEST_DIR)
session.run("isort", "--vn")
session.run("isort", SOURCE_PATH, NOXFILE_PATH, TEST_DIR)
session.run("ruff", "--version")
session.run("ruff", "format", SOURCE_PATH, NOXFILE_PATH, TEST_DIR)


@nox.session(python="3.8", reuse_venv=True)
@nox.parametrize("autoflake", [AUTOFLAKE_VERSION])
@nox.parametrize("isort", [ISORT_VERSION])
@nox.parametrize("ruff", [RUFF_VERSION])
def format_check(
session: Session,
autoflake: str,
isort: str,
ruff: str,
):
def format_check(session: Session, autoflake: str, ruff: str):
session.install(
f"autoflake~={autoflake}",
f"isort~={isort}",
f"ruff~={ruff}",
)
try:
Expand All @@ -113,8 +96,6 @@ def format_check(
)
session.run("autoflake", "--version")
session.run("autoflake", "--check-diff", SOURCE_PATH, NOXFILE_PATH, TEST_DIR)
session.run("isort", "--vn")
session.run("isort", "--check", "--diff", SOURCE_PATH, NOXFILE_PATH, TEST_DIR)
session.run("ruff", "--version")
session.run(
"ruff",
Expand All @@ -141,7 +122,7 @@ def mypy(session: Session, mypy: str):


@nox.session(python=False)
def test_under_current_env(session: Session):
def test(session: Session):
session.run(
"pytest",
"--cov",
Expand All @@ -158,9 +139,29 @@ def test_under_current_env(session: Session):
)


@nox.session(python=["3.6", "3.8", "3.10", "3.11", "3.12"])
@nox.session(reuse_venv=True)
def test_for_ci(session: Session):
session.install(
"coverage[toml]",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-mock",
"pytest-timeout",
)
test(session)


@nox.session(python=["3.6", "3.8", "3.10", "3.11", "3.12"], reuse_venv=True)
def test_all(session: Session):
session.install("pytest")
session.install(
"coverage[toml]",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-mock",
"pytest-timeout",
)
session.run(
"pytest",
"--cov",
Expand All @@ -177,7 +178,7 @@ def test_all(session: Session):
)


@nox.session(python="3.8")
@nox.session(python="3.8", reuse_venv=True)
@nox.parametrize("build", [BUILD_VERSION])
@nox.parametrize("twine", [TWINE_VERSION])
def publish(
Expand Down
Loading
Loading