-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy path_py_bfes.py
513 lines (380 loc) · 18.7 KB
/
_py_bfes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# Copyright 2020, 2021 PaGMO development team
#
# This file is part of the pygmo library.
#
# This Source Code Form is subject to the terms of the Mozilla
# Public License v. 2.0. If a copy of the MPL was not distributed
# with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
from threading import Lock as _Lock
def _mp_ipy_bfe_func(ser_prob_dv):
# The function that will be invoked
# by the individual processes/nodes of mp/ipy bfe.
import pickle
prob = pickle.loads(ser_prob_dv[0])
dv = pickle.loads(ser_prob_dv[1])
return pickle.dumps(prob.fitness(dv))
class mp_bfe(object):
"""Multiprocessing batch fitness evaluator.
.. versionadded:: 2.13
This user-defined batch fitness evaluator (UDBFE) will dispatch
the fitness evaluation in batch mode of a set of decision vectors
to a process pool created and managed via the facilities of the
standard Python :mod:`multiprocessing` module.
The evaluations of the decision vectors are dispatched to the processes
of a global :class:`pool <multiprocessing.pool.Pool>` shared between
different instances of :class:`~pygmo.mp_bfe`. The pool is created
either implicitly by the construction of the first :class:`~pygmo.mp_bfe`
object or explicitly via the :func:`~pygmo.mp_bfe.init_pool()`
static method. The default number of processes in the pool is equal to
the number of logical CPUs on the current machine. The pool's size can
be queried via :func:`~pygmo.mp_bfe.get_pool_size()`, and changed via
:func:`~pygmo.mp_bfe.resize_pool()`. The pool can be stopped via
:func:`~pygmo.mp_bfe.shutdown_pool()`.
.. note::
Due to certain implementation details of CPython, it is not possible to initialise, resize or shutdown the pool
from a thread different from the main one. Normally this is not a problem, but, for instance, if the first
:class:`~pygmo.mp_bfe` instance is created in a thread different from the main one, an error
will be raised. In such a situation, the user should ensure to call :func:`~pygmo.mp_bfe.init_pool()`
from the main thread before spawning the secondary thread.
.. warning::
Due to internal limitations of CPython, sending an interrupt signal (e.g., by pressing ``Ctrl+C`` in an interactive
Python session) while an :class:`~pygmo.mp_bfe` is running might end up sending an interrupt signal also to the
external process(es). This can lead to unpredictable runtime behaviour (e.g., the session may hang). Although
pygmo tries hard to limit as much as possible the chances of this occurrence, it cannot eliminate them completely. Users
are thus advised to tread carefully with interrupt signals (especially in interactive sessions) when using
:class:`~pygmo.mp_bfe`.
.. warning::
Due to an `upstream bug <https://bugs.python.org/issue38501>`_, when using Python 3.8 the multiprocessing
machinery may lead to a hangup when exiting a Python session. As a workaround until the bug is resolved, users
are advised to explicitly call :func:`~pygmo.mp_bfe.shutdown_pool()` before exiting a Python session.
"""
# Static variables for the pool.
_pool_lock = _Lock()
_pool = None
_pool_size = None
def __init__(self, chunksize=None):
"""
Args:
chunksize(:class:`int` or :data:`None`): if not :data:`None`, this positive integral represents
the approximate number of decision vectors that are processed by each task
submitted to the process pool by the call operator
Raises:
TypeError: if *chunksize* is neither :data:`None` nor a value of an integral type
ValueError: if *chunksize* is not strictly positive
unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
"""
if not chunksize is None and not isinstance(chunksize, int):
raise TypeError(
"The 'chunksize' argument must be None or an int, but it is of type '{}' instead".format(
type(chunksize)
)
)
if not chunksize is None and chunksize <= 0:
raise ValueError(
"The 'chunksize' parameter must be a positive integer, but its value is {} instead".format(
chunksize
)
)
# Init the process pool, if necessary.
mp_bfe.init_pool()
# Save the chunk size parameter.
self._chunksize = chunksize
def __call__(self, prob, dvs):
"""Call operator.
This method will evaluate in batch mode the fitnesses of the input decision vectors
*dvs* using the fitness function from the optimisation problem *prob*. The fitness
evaluations are delegated to the processes of the pool backing
:class:`~pygmo.mp_bfe`.
See the documentation of :class:`pygmo.bfe` for an explanation of the expected
formats of *dvs* and of the return value.
Args:
prob(:class:`~pygmo.problem`): the input problem
dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a
flattened 1D array
Returns:
:class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a
flattened 1D array
Raises:
unspecified: any exception thrown by the evaluations, by the (de)serialization
of the input arguments or of the return value, or by the public interface of the
process pool
"""
import pickle
import numpy as np
# Fetch the dimension and the fitness
# dimension of the problem.
ndim = prob.get_nx()
nf = prob.get_nf()
# Compute the total number of decision
# vectors represented by dvs.
ndvs = len(dvs) // ndim
# Reshape dvs so that it represents
# ndvs decision vectors of dimension ndim
# each.
dvs.shape = (ndvs, ndim)
# Pre-serialize the problem.
pprob = pickle.dumps(prob)
# Build the list of arguments to pass
# to the processes in the pool.
async_args = [(pprob, pickle.dumps(dv)) for dv in dvs]
with mp_bfe._pool_lock:
# Make sure the pool exists.
mp_bfe._init_pool_impl(None)
# Runt the objfun evaluations in async mode.
if self._chunksize is None:
ret = mp_bfe._pool.map_async(_mp_ipy_bfe_func, async_args)
else:
ret = mp_bfe._pool.map_async(
_mp_ipy_bfe_func, async_args, chunksize=self._chunksize
)
# Build the vector of fitness vectors as a 2D numpy array.
fvs = np.array([pickle.loads(fv) for fv in ret.get()])
# Reshape it so that it is 1D.
fvs.shape = (ndvs * nf,)
return fvs
def get_name(self):
"""Name of this evaluator.
Returns:
:class:`str`: ``"Multiprocessing batch fitness evaluator"``
"""
return "Multiprocessing batch fitness evaluator"
def get_extra_info(self):
"""Extra info for this evaluator.
If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`,
invoking this function will trigger the creation of a new pool.
Returns:
:class:`str`: a string containing information about the number of processes in the pool
Raises:
unspecified: any exception thrown by :func:`~pygmo.mp_bfe.get_pool_size()`
"""
return "\tNumber of processes in the pool: {}".format(mp_bfe.get_pool_size())
@staticmethod
def _init_pool_impl(processes):
# Implementation method for initing
# the pool. This will *not* do any locking.
from ._mp_utils import _make_pool
if mp_bfe._pool is None:
mp_bfe._pool, mp_bfe._pool_size = _make_pool(processes)
@staticmethod
def init_pool(processes=None):
"""Initialise the process pool.
This method will initialise the process pool backing :class:`~pygmo.mp_bfe`, if the pool
has not been initialised yet or if the pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`.
Otherwise, this method will have no effects.
Args:
processes(:data:`None` or an :class:`int`): the size of the pool (if :data:`None`, the size of the pool will be
equal to the number of logical CPUs on the system)
Raises:
ValueError: if the pool does not exist yet and the function is being called from a thread different
from the main one, or if *processes* is a non-positive value
TypeError: if *processes* is not :data:`None` and not an :class:`int`
"""
with mp_bfe._pool_lock:
mp_bfe._init_pool_impl(processes)
@staticmethod
def get_pool_size():
"""Get the size of the process pool.
If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this
function will trigger the creation of a new pool.
Returns:
:class:`int`: the current size of the pool
Raises:
unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
"""
with mp_bfe._pool_lock:
mp_bfe._init_pool_impl(None)
return mp_bfe._pool_size
@staticmethod
def resize_pool(processes):
"""Resize pool.
This method will resize the process pool backing :class:`~pygmo.mp_bfe`.
If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this
function will trigger the creation of a new pool.
Args:
processes(:class:`int`): the desired number of processes in the pool
Raises:
TypeError: if the *processes* argument is not an :class:`int`
ValueError: if the *processes* argument is not strictly positive
unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
"""
from ._mp_utils import _make_pool
if not isinstance(processes, int):
raise TypeError("The 'processes' argument must be an int")
if processes <= 0:
raise ValueError("The 'processes' argument must be strictly positive")
with mp_bfe._pool_lock:
# NOTE: this will either init a new pool
# with the requested number of processes,
# or do nothing if the pool exists already.
mp_bfe._init_pool_impl(processes)
if processes == mp_bfe._pool_size:
# Don't do anything if we are not changing
# the size of the pool.
return
# Create new pool.
new_pool, new_size = _make_pool(processes)
# Stop the current pool.
mp_bfe._pool.close()
mp_bfe._pool.join()
# Assign the new pool.
mp_bfe._pool = new_pool
mp_bfe._pool_size = new_size
@staticmethod
def shutdown_pool():
"""Shutdown pool.
This method will shut down the process pool backing :class:`~pygmo.mp_bfe`, after
all pending tasks in the pool have completed.
After the process pool has been shut down, attempting to use the evaluator
will raise an error. A new process pool can be created via an explicit call to
:func:`~pygmo.mp_bfe.init_pool()` or one of the methods of the public API of
:class:`~pygmo.mp_bfe` which trigger the creation of a new process pool.
"""
with mp_bfe._pool_lock:
if mp_bfe._pool is not None:
mp_bfe._pool.close()
mp_bfe._pool.join()
mp_bfe._pool = None
mp_bfe._pool_size = None
class ipyparallel_bfe(object):
"""Ipyparallel batch fitness evaluator.
.. versionadded:: 2.13
This user-defined batch fitness evaluator (UDBFE) will dispatch
the fitness evaluation in batch mode of a set of decision vectors
to an ipyparallel cluster. The communication with the cluster is managed
via an :class:`ipyparallel.LoadBalancedView` instance which is
created either implicitly when the first fitness evaluation is run, or
explicitly via the :func:`~pygmo.ipyparallel_bfe.init_view()` method. The
:class:`~ipyparallel.LoadBalancedView` instance is a global object shared
among all the ipyparallel batch fitness evaluators.
.. seealso::
https://ipyparallel.readthedocs.io/en/latest/
"""
# Static variables for the view.
_view_lock = _Lock()
_view = None
@staticmethod
def init_view(client_args=[], client_kwargs={}, view_args=[], view_kwargs={}):
"""Init the ipyparallel view.
This method will initialise the :class:`ipyparallel.LoadBalancedView`
which is used by all ipyparallel evaluators to submit the evaluation tasks
to an ipyparallel cluster. If the :class:`ipyparallel.LoadBalancedView`
has already been created, this method will perform no action.
The input arguments *client_args* and *client_kwargs* are forwarded
as positional and keyword arguments to the construction of an
:class:`ipyparallel.Client` instance. From the constructed client,
an :class:`ipyparallel.LoadBalancedView` instance is then created
via the :func:`ipyparallel.Client.load_balanced_view()` method, to
which the positional and keyword arguments *view_args* and
*view_kwargs* are passed.
Note that usually it is not necessary to explicitly invoke this
method: an :class:`ipyparallel.LoadBalancedView` is automatically
constructed with default settings the first time a batch evaluation task
is submitted to an ipyparallel evaluator. This method should be used
only if it is necessary to pass custom arguments to the construction
of the :class:`ipyparallel.Client` or :class:`ipyparallel.LoadBalancedView`
objects.
Args:
client_args(:class:`list`): the positional arguments used for the
construction of the client
client_kwargs(:class:`dict`): the keyword arguments used for the
construction of the client
view_args(:class:`list`): the positional arguments used for the
construction of the view
view_kwargs(:class:`dict`): the keyword arguments used for the
construction of the view
Raises:
unspecified: any exception thrown by the constructor of :class:`ipyparallel.Client`
or by the :func:`ipyparallel.Client.load_balanced_view()` method
"""
from ._ipyparallel_utils import _make_ipyparallel_view
with ipyparallel_bfe._view_lock:
if ipyparallel_bfe._view is None:
# Create the new view.
ipyparallel_bfe._view = _make_ipyparallel_view(
client_args, client_kwargs, view_args, view_kwargs
)
@staticmethod
def shutdown_view():
"""Destroy the ipyparallel view.
This method will destroy the :class:`ipyparallel.LoadBalancedView`
currently being used by the ipyparallel evaluators for submitting
evaluation tasks to an ipyparallel cluster. The view can be re-inited
implicitly by submitting a new evaluation task, or by invoking
the :func:`~pygmo.ipyparallel_bfe.init_view()` method.
"""
import gc
with ipyparallel_bfe._view_lock:
if ipyparallel_bfe._view is None:
return
old_view = ipyparallel_bfe._view
ipyparallel_bfe._view = None
del old_view
gc.collect()
def __call__(self, prob, dvs):
"""Call operator.
This method will evaluate in batch mode the fitnesses of the input decision vectors
*dvs* using the fitness function from the optimisation problem *prob*. The fitness
evaluations are delegated to the nodes of the ipyparallel cluster backing
:class:`~pygmo.ipyparallel_bfe`.
See the documentation of :class:`pygmo.bfe` for an explanation of the expected
formats of *dvs* and of the return value.
Args:
prob(:class:`~pygmo.problem`): the input problem
dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a
flattened 1D array
Returns:
:class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a
flattened 1D array
Raises:
unspecified: any exception thrown by the evaluations, by the (de)serialization
of the input arguments or of the return value, or by the public interface of
:class:`ipyparallel.LoadBalancedView`.
"""
import pickle
import numpy as np
from ._ipyparallel_utils import _make_ipyparallel_view
# Fetch the dimension and the fitness
# dimension of the problem.
ndim = prob.get_nx()
nf = prob.get_nf()
# Compute the total number of decision
# vectors represented by dvs.
ndvs = len(dvs) // ndim
# Reshape dvs so that it represents
# ndvs decision vectors of dimension ndim
# each.
dvs.shape = (ndvs, ndim)
# Pre-serialize the problem.
pprob = pickle.dumps(prob)
# Build the list of arguments to pass
# to the cluster nodes.
async_args = [(pprob, pickle.dumps(dv)) for dv in dvs]
with ipyparallel_bfe._view_lock:
if ipyparallel_bfe._view is None:
ipyparallel_bfe._view = _make_ipyparallel_view([], {}, [], {})
ret = ipyparallel_bfe._view.map_async(_mp_ipy_bfe_func, async_args)
# Build the vector of fitness vectors as a 2D numpy array.
fvs = np.array([pickle.loads(fv) for fv in ret.get()])
# Reshape it so that it is 1D.
fvs.shape = (ndvs * nf,)
return fvs
def get_name(self):
"""Name of the evaluator.
Returns:
:class:`str`: ``"Ipyparallel batch fitness evaluator"``
"""
return "Ipyparallel batch fitness evaluator"
def get_extra_info(self):
"""Extra info for this evaluator.
Returns:
:class:`str`: a string with extra information about the status of the evaluator
"""
from copy import deepcopy
with ipyparallel_bfe._view_lock:
if ipyparallel_bfe._view is None:
return "\tNo cluster view has been created yet"
else:
d = deepcopy(ipyparallel_bfe._view.queue_status())
return "\tQueue status:\n\t\n\t" + "\n\t".join(
["(" + str(k) + ", " + str(d[k]) + ")" for k in d]
)