Skip to content

Commit

Permalink
issue #102: add type hints
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Cafasso <[email protected]>
  • Loading branch information
noxdafox committed Sep 10, 2022
1 parent 18edfd2 commit 9f8d8d5
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 194 deletions.
4 changes: 2 additions & 2 deletions pebble/asynchronous/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from functools import wraps
from concurrent.futures import TimeoutError

from pebble.common import ProcessExpired, ProcessFuture
from pebble.common import ProcessExpired
from pebble.common import process_execute, send_result
from pebble.common import launch_process, stop_process, SLEEP_UNIT
from pebble.common import process_execute, launch_thread, send_result


def process(*args, **kwargs):
Expand Down
9 changes: 5 additions & 4 deletions pebble/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import threading

from functools import wraps
from typing import Any, Callable


_synchronized_lock = threading.Lock()
Expand All @@ -44,16 +45,16 @@ def wrap(function):
return wrap


def decorate_synchronized(function, lock):
def decorate_synchronized(function: Callable, lock: threading.Lock) -> Callable:
@wraps(function)
def wrapper(*args, **kwargs):
def wrapper(*args, **kwargs) -> Any:
with lock:
return function(*args, **kwargs)

return wrapper


def sighandler(signals):
def sighandler(signals: list) -> Callable:
"""Sets the decorated function as signal handler of given *signals*.
*signals* can be either a single signal or a list/tuple
Expand All @@ -72,7 +73,7 @@ def wrapper(*args, **kwargs):
return wrap


def set_signal_handlers(signals, function):
def set_signal_handlers(signals: list, function: Callable):
if isinstance(signals, (list, tuple)):
for signum in signals:
signal.signal(signum, function)
Expand Down
37 changes: 17 additions & 20 deletions pebble/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

from time import time
from types import MethodType

from typing import Callable, Optional

_waitforthreads_lock = threading.Lock()


def waitforqueues(queues, timeout=None):
def waitforqueues(queues: list, timeout: float = None) -> filter:
"""Waits for one or more *Queue* to be ready or until *timeout* expires.
*queues* is a list containing one or more *Queue.Queue* objects.
Expand All @@ -44,7 +44,7 @@ def waitforqueues(queues, timeout=None):
return filter(lambda q: not q.empty(), queues)


def prepare_queues(queues, lock):
def prepare_queues(queues: list, lock: threading.Condition):
"""Replaces queue._put() method in order to notify the waiting Condition."""
for queue in queues:
queue._pebble_lock = lock
Expand All @@ -53,13 +53,15 @@ def prepare_queues(queues, lock):
queue._put = MethodType(new_method, queue)


def wait_queues(queues, lock, timeout):
def wait_queues(queues: list,
lock: threading.Condition,
timeout: Optional[float]):
with lock:
if not any(map(lambda q: not q.empty(), queues)):
lock.wait(timeout)


def reset_queues(queues):
def reset_queues(queues: list):
"""Resets original queue._put() method."""
for queue in queues:
with queue.mutex:
Expand All @@ -68,7 +70,7 @@ def reset_queues(queues):
delattr(queue, '_pebble_lock')


def waitforthreads(threads, timeout=None):
def waitforthreads(threads: list, timeout: float = None) -> filter:
"""Waits for one or more *Thread* to exit or until *timeout* expires.
.. note::
Expand Down Expand Up @@ -99,21 +101,19 @@ def new_function(*args):
return filter(lambda t: not t.is_alive(), threads)


def prepare_threads(new_function):
def prepare_threads(new_function: Callable) -> Callable:
"""Replaces threading._get_ident() function in order to notify
the waiting Condition."""
with _waitforthreads_lock:
if hasattr(threading, 'get_ident'):
old_function = threading.get_ident
threading.get_ident = new_function
else:
old_function = threading._get_ident
threading._get_ident = new_function
old_function = threading.get_ident
threading.get_ident = new_function

return old_function


def wait_threads(threads, lock, timeout):
def wait_threads(threads: list,
lock: threading.Condition,
timeout: Optional[float]):
timestamp = time()

with lock:
Expand All @@ -126,13 +126,10 @@ def wait_threads(threads, lock, timeout):
return


def reset_threads(old_function):
"""Resets original threading._get_ident() function."""
def reset_threads(old_function: Callable):
"""Resets original threading.get_ident() function."""
with _waitforthreads_lock:
if hasattr(threading, 'get_ident'):
threading.get_ident = old_function
else:
threading._get_ident = old_function
threading.get_ident = old_function


def new_method(self, *args):
Expand Down
92 changes: 62 additions & 30 deletions pebble/pool/base_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
import time
import logging

from queue import Queue
from threading import RLock
from collections import namedtuple
from typing import Callable, Optional
from itertools import chain, count, islice
from concurrent.futures import TimeoutError
try:
from queue import Queue
except ImportError:
from Queue import Queue
from concurrent.futures import Future, TimeoutError

from pebble.common import PebbleFuture, ProcessFuture, SLEEP_UNIT


class BasePool(object):
def __init__(self, max_workers, max_tasks, initializer, initargs):
class BasePool:
def __init__(self, max_workers: int,
max_tasks: int,
initializer: Optional[Callable],
initargs: list):
self._context = PoolContext(
max_workers, max_tasks, initializer, initargs)
self._loops = ()
Expand All @@ -44,7 +45,7 @@ def __exit__(self, *args):
self.join()

@property
def active(self):
def active(self) -> bool:
self._update_pool_state()

return self._context.state in (CLOSED, RUNNING)
Expand All @@ -60,7 +61,7 @@ def stop(self):
"""Stops the pool without performing any pending task."""
self._context.state = STOPPED

def join(self, timeout=None):
def join(self, timeout: float = None):
"""Joins the pool waiting until all workers exited.
If *timeout* is set, it block until all workers are done
Expand All @@ -76,7 +77,7 @@ def join(self, timeout=None):
self._context.task_queue.put(None)
self._stop_pool()

def _wait_queue_depletion(self, timeout):
def _wait_queue_depletion(self, timeout: Optional[float]):
tick = time.time()

while self.active:
Expand Down Expand Up @@ -110,8 +111,11 @@ def _stop_pool(self):
raise NotImplementedError("Not implemented")


class PoolContext(object):
def __init__(self, max_workers, max_tasks, initializer, initargs):
class PoolContext:
def __init__(self, max_workers: int,
max_tasks: int,
initializer: Callable,
initargs: list):
self._state = CREATED
self.state_mutex = RLock()

Expand All @@ -121,31 +125,34 @@ def __init__(self, max_workers, max_tasks, initializer, initargs):
self.worker_parameters = Worker(max_tasks, initializer, initargs)

@property
def state(self):
def state(self) -> int:
return self._state

@state.setter
def state(self, state):
def state(self, state: int):
with self.state_mutex:
if self.alive:
self._state = state

@property
def alive(self):
def alive(self) -> bool:
return self.state not in (ERROR, STOPPED)


class Task:
def __init__(self, identifier, future, timeout, payload):
def __init__(self, identifier: int,
future: Future,
timeout: Optional[float],
payload: 'TaskPayload'):
self.id = identifier
self.future = future
self.timeout = timeout
self.payload = payload
self.timestamp = 0
self.timestamp = 0.0
self.worker_id = 0

@property
def started(self):
def started(self) -> bool:
return bool(self.timestamp > 0)

def set_running_or_notify_cancel(self):
Expand All @@ -163,39 +170,47 @@ def set_running_or_notify_cancel(self):


class MapFuture(PebbleFuture):
def __init__(self, futures):
super(MapFuture, self).__init__()
def __init__(self, futures: list):
super().__init__()
self._futures = futures

def cancel(self):
@property
def futures(self) -> list:
return self._futures

def cancel(self) -> bool:
"""Cancel the future.
Returns True if any of the elements of the iterables is cancelled.
False otherwise.
"""
super(MapFuture, self).cancel()
super().cancel()

return any(tuple(f.cancel() for f in self._futures))


class ProcessMapFuture(ProcessFuture):
def __init__(self, futures):
super(ProcessMapFuture, self).__init__()
def __init__(self, futures: list):
super().__init__()
self._futures = futures

def cancel(self):
@property
def futures(self) -> list:
return self._futures

def cancel(self) -> bool:
"""Cancel the future.
Returns True if any of the elements of the iterables is cancelled.
False otherwise.
"""
super(ProcessMapFuture, self).cancel()
super().cancel()

return any(tuple(f.cancel() for f in self._futures))


class MapResults:
def __init__(self, futures, timeout=None):
def __init__(self, futures: list, timeout: float = None):
self._results = chain.from_iterable(
chunk_result(f, timeout) for f in futures)

Expand All @@ -213,7 +228,24 @@ def next(self):
__next__ = next


def iter_chunks(chunksize, *iterables):
def map_results(map_future: MapFuture, timeout: Optional[float]) -> MapFuture:
futures = map_future.futures
if not futures:
map_future.set_result(MapResults(futures))
return map_future

def done_map(_):
if not map_future.done():
map_future.set_result(MapResults(futures, timeout=timeout))

for future in futures:
future.add_done_callback(done_map)
setattr(future, 'map_future', map_future)

return map_future


def iter_chunks(chunksize: int, *iterables):
"""Iterates over zipped iterables in chunks."""
iterables = iter(zip(*iterables))

Expand All @@ -226,15 +258,15 @@ def iter_chunks(chunksize, *iterables):
yield chunk


def chunk_result(future, timeout):
def chunk_result(future: ProcessFuture, timeout: Optional[float]):
"""Returns the results of a processed chunk."""
try:
return future.result(timeout=timeout)
except Exception as error:
return (error, )


def run_initializer(initializer, initargs):
def run_initializer(initializer: Callable, initargs: list):
"""Runs the Pool initializer dealing with errors."""
try:
initializer(*initargs)
Expand Down
Loading

0 comments on commit 9f8d8d5

Please sign in to comment.