Skip to content

Commit

Permalink
changed idle worker error to a warning. cleaned up functional_test_da…
Browse files Browse the repository at this point in the history
…sk.py. added progress_bar.py. changed variable names in bayes.py
  • Loading branch information
Peter Habelitz committed May 22, 2019
1 parent 293a973 commit ad123a7
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 67 deletions.
52 changes: 30 additions & 22 deletions phs/bayes.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,17 @@ def compute_bayesian_suggestion(at_index,

kernel = gp.kernels.Matern(nu=2.5)
alpha = 1e-6
model = gp.GaussianProcessRegressor(
kernel=kernel, alpha=alpha, n_restarts_optimizer=25, normalize_y=True)
model = gp.GaussianProcessRegressor(kernel=kernel,
alpha=alpha,
n_restarts_optimizer=25,
normalize_y=True)
model.fit(xp, yp)
next_sample = sample_next_hyperparameter(
expected_improvement, model, yp, greater_is_better=False, bounds=bounds, n_restarts=100)
next_sample = get_next_sample(acquisition_function=expected_improvement,
gaussian_process=model,
evaluated_loss=yp,
maximize=False,
bounds=bounds,
restarts=100)

bayesian_replacement_dict = {}
for i, col in enumerate(bayesian_col_name_list):
Expand All @@ -119,24 +125,24 @@ def compute_bayesian_suggestion(at_index,
return bayesian_replacement_dict


def sample_next_hyperparameter(acquisition_func,
def get_next_sample(acquisition_function,
gaussian_process,
evaluated_loss,
greater_is_better,
maximize,
bounds=(0, 10),
n_restarts=25):
restarts=25):

best_x = None
best_acquisition_value = None
n_params = bounds.shape[0]
number_parameters = bounds.shape[0]

for starting_point in np.random.uniform(bounds[:, 0], bounds[:, 1], size=(n_restarts, n_params)):
for starting_point in np.random.uniform(bounds[:, 0], bounds[:, 1], size=(restarts, number_parameters)):

minimize_obj = minimize(fun=acquisition_func,
minimize_obj = minimize(fun=acquisition_function,
x0=starting_point.reshape(1, -1),
bounds=bounds,
method='L-BFGS-B',
args=(gaussian_process, evaluated_loss, greater_is_better, n_params))
args=(gaussian_process, evaluated_loss, maximize, number_parameters))

if best_acquisition_value is None:
best_acquisition_value = minimize_obj.fun
Expand All @@ -146,25 +152,27 @@ def sample_next_hyperparameter(acquisition_func,
return best_x


def expected_improvement(x, gaussian_process, evaluated_loss, greater_is_better=False, n_params=1):
def expected_improvement(x, gaussian_process, evaluated_loss, maximize=False, number_parameters=1):

x_to_predict = x.reshape(-1, n_params)
x_to_predict = x.reshape(-1, number_parameters)

mu, sigma = gaussian_process.predict(x_to_predict, return_std=True)
mean, std_dev = gaussian_process.predict(x_to_predict, return_std=True)

if greater_is_better:
if maximize:
loss_optimum = np.max(evaluated_loss)
else:
loss_optimum = np.min(evaluated_loss)

scaling_factor = (-1) ** (not greater_is_better)
if maximize:
factor = 1
else:
factor = -1

# In case sigma equals zero
with np.errstate(divide='ignore'):
with np.errstate(divide='ignore'): # overcome std_dev = 0
xi = 0.0
Z = scaling_factor * (mu - loss_optimum - xi) / sigma
expected_improvement = scaling_factor * \
(mu - loss_optimum - xi) * norm.cdf(Z) + sigma * norm.pdf(Z)
expected_improvement[sigma == 0.0] == 0.0
Z = factor * (mean - loss_optimum - xi) / std_dev
expected_improvement = factor * \
(mean - loss_optimum - xi) * norm.cdf(Z) + std_dev * norm.pdf(Z)
expected_improvement[std_dev == 0.0] == 0.0

return -1 * expected_improvement
51 changes: 21 additions & 30 deletions phs/compute_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import glob
import ntpath
import shutil
import warnings

import phs.proxy
import phs.bayes
import phs.utils
import phs.utils_parameter_io
import phs.global_names
import phs.progress_bar


class ComputeDefinition:
Expand All @@ -36,6 +38,8 @@ def __init__(self,
'provide_worker_path', True, config_dict)
self.redirect_stdout = phs.utils.set_default_value_to_optional_key(
'redirect_stdout', True, config_dict)
self.ignore_warnings = phs.utils.set_default_value_to_optional_key(
'ignore_warnings', True, config_dict)
self.bayesian_wait_for_all = phs.utils.set_default_value_to_optional_key(
'bayesian_wait_for_all', False, config_dict)
self.monitor_root_dir = phs.utils.set_default_value_to_optional_key(
Expand All @@ -46,8 +50,13 @@ def __init__(self,
'monitor_func_name_with_args', {}, config_dict)

for key, value in config_dict.items():
if key == 'local_processes_num_workers' and config_dict['parallelization'] != 'local_processes':
continue
print('\t{0:40}{1:}'.format(key, value))

if self.ignore_warnings:
warnings.simplefilter("ignore")

self.zero_fill = 6
self.std_out_name = 'stdout.txt'
self.result_col_name = 'result'
Expand Down Expand Up @@ -114,30 +123,6 @@ def __init__(self,
break
self.number_of_parameter_sets_until_first_bayesian_task = count_row

if self.parallelization == 'local_processes':
if self.number_of_parameter_sets_until_first_bayesian_task < self.local_processes_num_workers:
phs.utils.format_stderr()
raise ValueError('\n\nThere are ' + str(self.number_of_parameter_sets_until_first_bayesian_task) +
' non bayesian parameter sets until the first one with an bayesian task appears but ' +
str(self.local_processes_num_workers) + ' processes. As a result ' + str(self.local_processes_num_workers -
self.number_of_parameter_sets_until_first_bayesian_task) + ' processes would immediately start a bayesian tasks '
'without any available result.\nPlease increase number of initial non bayesian evaluations or lower number of processes.')

if self.parallelization == 'dask':
from dask.distributed import Client
DASK_MASTER_IP = os.environ['DASK_MASTER_IP']
DASK_MASTER_PORT = os.environ['DASK_MASTER_PORT']
with Client(DASK_MASTER_IP + ':' + DASK_MASTER_PORT, timeout='10s') as client:
number_dask_workers = len(client.scheduler_info()['workers'])
if self.number_of_parameter_sets_until_first_bayesian_task < number_dask_workers:
phs.utils.format_stderr()
raise ValueError('\n\nThere are ' + str(self.number_of_parameter_sets_until_first_bayesian_task) +
' non bayesian parameter sets until the first one with an bayesian task appears but ' +
str(number_dask_workers) + ' workers. As a result ' + str(number_dask_workers -
self.number_of_parameter_sets_until_first_bayesian_task) + ' workers would immediately start a bayesian task '
'without any available result.\nPlease increase number of initial non bayesian evaluations or lower number of workers.')




self.get_experiment_state()
Expand Down Expand Up @@ -189,7 +174,7 @@ def get_experiment_state(self):
self.remaining_parameter_index_list.sort()

print('\t{0:40}{1:}'.format('state', self.exp_state))
print('\t{0:40}{1:}'.format('number of all parameter sets', len(all_indices)))
print('\t{0:40}{1:}'.format('total number of parameter sets', len(all_indices)))
print('\t{0:40}{1:}'.format('number of computed parameter sets', len(computed_indices)))

def _ensure_clean_state(self):
Expand Down Expand Up @@ -283,6 +268,10 @@ def start_execution(self):
from concurrent.futures import wait, as_completed
with PoolExecutor(max_workers=self.local_processes_num_workers) as executor:
self.pp.pprint(executor.__dict__)

if self.number_of_parameter_sets_until_first_bayesian_task < self.local_processes_num_workers:
phs.utils.idle_workers_warning(self.number_of_parameter_sets_until_first_bayesian_task, self.local_processes_num_workers)

self.start_execution_kernel(executor, wait, as_completed)
self.as_completed_functions(as_completed)

Expand All @@ -302,6 +291,11 @@ def start_execution(self):
with Client(DASK_MASTER_IP + ':' + DASK_MASTER_PORT, timeout='10s') as client:
client.restart()
self.pp.pprint(client.scheduler_info())

number_dask_workers = len(client.scheduler_info()['workers'])
if self.number_of_parameter_sets_until_first_bayesian_task < number_dask_workers:
phs.utils.idle_workers_warning(self.number_of_parameter_sets_until_first_bayesian_task, number_dask_workers)

client.upload_file(os.path.abspath(phs.bayes.__file__)) # probably not necessary
client.upload_file(os.path.abspath(phs.proxy.__file__)) # probably not necessary
client.upload_file(os.path.abspath(phs.utils.__file__)) # probably not necessary
Expand Down Expand Up @@ -407,14 +401,11 @@ def as_completed_functions(self, as_completed):
"""
print('', end='\n')

sub_future_len = len(self.sub_future)
progress_bar_len = 40
for count_finished, f in enumerate(as_completed(self.sub_future), 1):
finished_portion = count_finished/sub_future_len
progress_bar_signs = int(round(progress_bar_len * finished_portion))
print('[{0}] | {1:>3}% | {2:>6} of {3:>6}' .format(
'='*progress_bar_signs + ' '*(progress_bar_len-progress_bar_signs),
int(round(finished_portion*100)), count_finished, len(self.sub_future)), end='\r')
phs.progress_bar.progress_bar(progress=count_finished, total=len(self.sub_future))
self.append_additional_information(f)
self.monitor_functions()
print('\n')
Expand Down
20 changes: 5 additions & 15 deletions phs/functional_test_dask.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
import examples.func_def.basic_test_functions
# import carme
from dask.distributed import Client, as_completed
import os
import pprint
pp = pprint.PrettyPrinter(indent=4)
# carme.resetKernel()
# carme.addCarmePythonPath('/home/HabelitzP/parallel_hyperparameter_search/examples')
# carme.resetKernel()
# import CarmeModules.HyperParameterSearch.examples.test_functions as test_functions
# repository_root_dir = '/home/HabelitzP' #(example:'/home/NAME')
# import sys
# sys.path.append(repository_root_dir + '/parallel_hyperparameter_search/examples')

'''def task(hyperpar):
exec(hyperpar, globals(), globals())
result = x * y
return result'''

# common function call with parameter correctly formated as a dict with 'hyperpar' key and string value
print('\nlocal function call and evaluation with parameter correctly formated as a dict with \'hyperpar\' key and string value.')
par = {'hyperpar': 'x=2\ny=3'}
print(examples.func_def.basic_test_functions.test_griewank(par))


print('\nfunction submission and evaluation on distributed workers with parameter correctly formated as a dict with \'hyperpar\' key and string value:\n')
DASK_MASTER_IP = os.environ['DASK_MASTER_IP']
DASK_MASTER_PORT = os.environ['DASK_MASTER_PORT']
sub_future = []
parameter_string_list = ['x=1\ny=2', 'x=2\ny=4', 'x=3\ny=6']
with Client(DASK_MASTER_IP + ':' + DASK_MASTER_PORT, timeout='20s') as client:
client.restart()
pp.pprint(client.scheduler_info())
client.upload_file(
'/home/HabelitzP/parallel_hyperparameter_search/examples/func_def/basic_test_functions.py')
client.upload_file(os.path.abspath(examples.func_def.basic_test_functions.__file__))

# submission of function calls and appending of futures to a list
# arguments originate from a list of strings and put into a dict one by one
Expand Down
63 changes: 63 additions & 0 deletions phs/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import time
import numpy as np

RED = "\033[1;31m"
BLUE = "\033[1;34m"
CYAN = "\033[1;36m"
GREEN = "\033[0;32m"
YELLOW = "\033[0;33m"
RESET = "\033[0;0m"
BOLD = "\033[;1m"
REVERSE = "\033[;7m"

def progress_bar(
progress,
total,
start_time = None,
bar_width = 20,
prog_sign = '=',
left_border='[',
right_border=']',
sep='|',
prog_color=YELLOW,
total_color=GREEN,
time_color=BLUE):

if progress != total:
end = '\r'
else:
end = ''

if start_time is not None:
elapsed_time =time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time))
else:
elapsed_time = ''
finished_portion = progress/total
percent = int(finished_portion * 100)
prog = int(round(bar_width * finished_portion)) * prog_sign
print('{total_color}{left_border}{prog_color}{prog:<{bar_width}}{total_color}{right_border}{reset} {sep} {prog_color}{percent:>3}%{reset} {sep} {prog_color}{progress:>5}{reset} of {total_color}{total:>6}{reset} {sep} {time_color}{elapsed_time:>10}{reset}'
.format(
total_color=total_color,
left_border=left_border,
prog_color = prog_color,
prog=prog,
bar_width=bar_width,
reset=RESET,
right_border=right_border,
sep=sep,
percent=percent,
progress=progress,
total=total,
time_color=time_color,
elapsed_time=elapsed_time),
end=end)




if __name__ == '__main__':
total=100
for count in range(1, total+1):
time.sleep(0.05)
progress_bar(progress=count, total=total)
21 changes: 21 additions & 0 deletions phs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ def print_subsection(header):
sys.stdout.write(RESET)


def idle_workers_warning(number_of_parameter_sets_until_first_bayesian_task, number_workers):
"""
Print a orange section header formated with ':' filling up the complete terminal width.
If terminal width cannot be If the terminal size cannot be successfully queried,
either because the system doesn’t support querying, or because we are not connected to a terminal,
the default value is (80, 24) which is the default size used by many terminal emulators.
"""
terminal_cols, _ = shutil.get_terminal_size()
sys.stdout.write(ORANGE)
print('{::^{width}}' .format('', width=terminal_cols))
print('{::^{width}}' .format(' ' + 'Warning' + ' ', width=terminal_cols))
print('{::^{width}}' .format('', width=terminal_cols))
print('\tThere are ' + str(number_of_parameter_sets_until_first_bayesian_task) +
' non bayesian parameter sets until the first with a bayesian task appears but ' +
str(number_workers) + ' workers/processes were assigned.\n\tAs a result ' + str(number_workers -
number_of_parameter_sets_until_first_bayesian_task) + ' workers/processes would immediately start a bayesian task '
'but instead have to wait because no results are available yet.\n\tThis will cause these workers to idle.'
'\n\tIncrease number of initial non bayesian evaluations or decrease the number of workers for more efficiency.')
sys.stdout.write(RESET)


def format_stderr():
sys.stderr.write(RED)

Expand Down

0 comments on commit ad123a7

Please sign in to comment.