From 750cd0cf033d7a8264b988bed2802950c48ba200 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 24 May 2018 02:02:47 -0400 Subject: [PATCH] Match Cluster's use of regular methods for scaling Instead of having coroutines used for `scale_up` and `scale_down`, use regular methods in their place. Move the coroutines into internal spec. This breaks the API of dask-drmaa. However is technically more correct given the Cluster API's current expectations. --- dask_drmaa/core.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dask_drmaa/core.py b/dask_drmaa/core.py index 18fe3d0..5a1ae2d 100644 --- a/dask_drmaa/core.py +++ b/dask_drmaa/core.py @@ -285,17 +285,20 @@ def stop_workers(self, worker_ids, sync=False): retired = yield self.scheduler.retire_workers(workers=worker_ips, close_workers=True) logger.info("Retired workers %s", retired) - yield self.scale_down(worker_ids) + yield self._scale_down(worker_ids) if sync: get_session().synchronize(worker_ids, dispose=True) @gen.coroutine - def scale_up(self, n, **kwargs): + def _scale_up(self, n, **kwargs): yield [self.start_workers(**kwargs) for _ in range(n - len(self.workers))] + def scale_up(self, n, **kwargs): + self.scheduler.loop.add_callback(self._scale_up, n, **kwargs) + @gen.coroutine - def scale_down(self, worker_ids): + def _scale_down(self, worker_ids): worker_ids = list(set(worker_ids)) for wid in worker_ids: @@ -312,6 +315,9 @@ def scale_down(self, worker_ids): logger.info("Scaling down workers %s", worker_ids) + def scale_down(self, worker_ids): + self.scheduler.loop.add_callback(self._scale_down, worker_ids) + def close(self): logger.info("Closing DRMAA cluster") self.stop_workers(self.workers, sync=True)