Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Commit

Permalink
Match Cluster's use of regular methods for scaling
Browse files Browse the repository at this point in the history
Instead of having coroutines used for `scale_up` and `scale_down`, use
regular methods in their place. Move the coroutines into internal spec.
This breaks the API of dask-drmaa. However is technically more correct
given the Cluster API's current expectations.
  • Loading branch information
jakirkham committed May 24, 2018
1 parent 279c3e5 commit 81955bd
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions dask_drmaa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,15 @@ def stop_workers(self, worker_ids, sync=False):
get_session().synchronize(worker_ids, dispose=True)

@gen.coroutine
def scale_up(self, n, **kwargs):
def _scale_up(self, n, **kwargs):
yield [self.start_workers(**kwargs)
for _ in range(n - len(self.workers))]

def scale_up(self, n, **kwargs):
self.scheduler.loop.add_callback(self._scale_up, n, **kwargs)

@gen.coroutine
def scale_down(self, worker_ids):
def _scale_down(self, worker_ids):
worker_ids = list(set(worker_ids))

for wid in worker_ids:
Expand All @@ -312,6 +315,9 @@ def scale_down(self, worker_ids):

logger.info("Scaling down workers %s", worker_ids)

def scale_down(self, worker_ids):
self.scheduler.loop.add_callback(self._scale_down, worker_ids)

def close(self):
logger.info("Closing DRMAA cluster")
self.stop_workers(self.workers, sync=True)
Expand Down

0 comments on commit 81955bd

Please sign in to comment.