diff --git a/adaptive/runner.py b/adaptive/runner.py index ed9274d00..8a567375e 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- import asyncio +import concurrent.futures as concurrent import functools import inspect -import concurrent.futures as concurrent -import warnings +import os import time +import warnings from .notebook_integration import live_plot, live_info, in_ipynb @@ -27,6 +28,23 @@ pass +if os.name == 'nt': + if with_distributed: + _default_executor = distributed.Client + _default_executor_kwargs = {'address': distributed.LocalCluster()} + else: + warnings.warn('The default executor on Windows (`distributed.Client`) ' + 'for adaptive.Runner cannot be used because `distributed` is not ' + 'installed. Consider installing `distributed`. ' + 'Specify `executor` when using the `Runner`. Note: a ' + '`concurrent.futures.ProcessPoolExecutor` does not work on ' + 'Windows.') +else: + _default_executor = concurrent.ProcessPoolExecutor + _default_executor_kwargs = {} + + + class BaseRunner: """Base class for runners that use concurrent.futures.Executors. @@ -86,9 +104,12 @@ class BlockingRunner(BaseRunner): The end condition for the calculation. This function must take the learner as its sole argument, and return True when we should stop requesting more points. - executor : concurrent.futures.Executor, or ipyparallel.Client, optional + executor : concurrent.futures.Executor, distributed.Client, + or ipyparallel.Client, optional The executor in which to evaluate the function to be learned. - If not provided, a new ProcessPoolExecutor is used. + If not provided, a new `ProcessPoolExecutor` is used on Unix systems + while on Windows a `distributed.Client` is used if `distributed` is + installed. ntasks : int, optional The number of concurrent function evaluations. Defaults to the number of cores available in 'executor'. @@ -183,9 +204,12 @@ class AsyncRunner(BaseRunner): the learner as its sole argument, and return True when we should stop requesting more points. If not provided, the runner will run forever, or until 'self.task.cancel()' is called. - executor : concurrent.futures.Executor, or ipyparallel.Client, optional + executor : concurrent.futures.Executor, distributed.Client, + or ipyparallel.Client, optional The executor in which to evaluate the function to be learned. - If not provided, a new ProcessPoolExecutor is used. + If not provided, a new `ProcessPoolExecutor` is used on Unix systems + while on Windows a `distributed.Client` is used if `distributed` is + installed. ntasks : int, optional The number of concurrent function evaluations. Defaults to the number of cores available in 'executor'. @@ -440,8 +464,9 @@ def shutdown(self, wait=True): def _ensure_executor(executor): if executor is None: - return concurrent.ProcessPoolExecutor() - elif isinstance(executor, concurrent.Executor): + executor = _default_executor(**_default_executor_kwargs) + + if isinstance(executor, concurrent.Executor): return executor elif with_ipyparallel and isinstance(executor, ipyparallel.Client): return executor.executor()