Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging and remove deadlock #1012

Merged
merged 4 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions autosklearn/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from autosklearn.util.stopwatch import StopWatch
from autosklearn.util.logging_ import (
get_logger,
is_port_in_use,
LogRecordSocketReceiver,
setup_logger,
)
Expand Down Expand Up @@ -247,7 +246,11 @@ def _create_dask_client(self):
# file was deleted, so the client could not close
# the worker properly
local_directory=tempfile.gettempdir(),
)
# Memory is handled by the pynisher, not by the dask worker/nanny
memory_limit=0,
),
# Heartbeat every 10s
heartbeat_interval=10000,
)

def _close_dask_client(self):
Expand All @@ -269,26 +272,35 @@ def _get_logger(self, name):
# Setup the configuration for the logger
# This is gonna be honored by the server
# Which is created below
setup_logger(os.path.join(self._backend.temporary_directory,
'%s.log' % str(logger_name)),
self.logging_config,
)

# The desired port might be used, so check this
while is_port_in_use(self._logger_port):
self._logger_port += 1
setup_logger(
output_file=os.path.join(
self._backend.temporary_directory, '%s.log' % str(logger_name)
),
logging_config=self.logging_config,
output_dir=self._backend.temporary_directory,
)

# As Auto-sklearn works with distributed process,
# we implement a logger server that can receive tcp
# pickled messages. They are unpickled and processed locally
# under the above logging configuration setting
# We need to specify the logger_name so that received records
# are treated under the logger_name ROOT logger setting
self.stop_logging_server = multiprocessing.Event()
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
port=self._logger_port,
event=self.stop_logging_server)
self.logging_server = multiprocessing.Process(
context = multiprocessing.get_context('fork')
self.stop_logging_server = context.Event()

while True:
# Loop until we find a valid port
self._logger_port = np.random.randint(10000, 65535)
try:
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
port=self._logger_port,
event=self.stop_logging_server)
break
except OSError:
continue

self.logging_server = context.Process(
target=self.logger_tcpserver.serve_until_stopped)
self.logging_server.daemon = False
self.logging_server.start()
Expand Down Expand Up @@ -354,7 +366,6 @@ def _do_dummy_prediction(self, datamanager, num_run):
autosklearn_seed=self._seed,
resampling_strategy=self._resampling_strategy,
initial_num_run=num_run,
logger=self._logger,
stats=stats,
metric=self._metric,
memory_limit=memory_limit,
Expand Down Expand Up @@ -409,6 +420,9 @@ def fit(
only_return_configuration_space: Optional[bool] = False,
load_models: bool = True,
):
self._backend.save_start_time(self._seed)
self._stopwatch = StopWatch()

# Make sure that input is valid
# Performs Ordinal one hot encoding to the target
# both for train and test data
Expand All @@ -434,6 +448,12 @@ def fit(
raise ValueError('Metric must be instance of '
'autosklearn.metrics.Scorer.')

if dataset_name is None:
dataset_name = hash_array_or_matrix(X)
# By default try to use the TCP logging port or get a new port
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
self._logger = self._get_logger(dataset_name)

# If no dask client was provided, we create one, so that we can
# start a ensemble process in parallel to smbo optimize
if (
Expand All @@ -444,18 +464,9 @@ def fit(
else:
self._is_dask_client_internally_created = False

if dataset_name is None:
dataset_name = hash_array_or_matrix(X)

self._backend.save_start_time(self._seed)
self._stopwatch = StopWatch()
self._dataset_name = dataset_name
self._stopwatch.start_task(self._dataset_name)

# By default try to use the TCP logging port or get a new port
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
self._logger = self._get_logger(dataset_name)

if feat_type is not None and len(feat_type) != X.shape[1]:
raise ValueError('Array feat_type does not have same number of '
'variables as X has features. %d vs %d.' %
Expand Down
30 changes: 13 additions & 17 deletions autosklearn/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import math
import numbers
import logging.handlers
import multiprocessing
import os
import pickle
import re
import shutil
import sys
import time
import traceback
from typing import List, Optional, Tuple, Union
Expand All @@ -28,7 +30,7 @@
from autosklearn.metrics import calculate_score, Scorer
from autosklearn.ensembles.ensemble_selection import EnsembleSelection
from autosklearn.ensembles.abstract_ensemble import AbstractEnsemble
from autosklearn.util.logging_ import get_named_client_logger
from autosklearn.util.logging_ import get_named_client_logger, get_logger

Y_ENSEMBLE = 0
Y_VALID = 1
Expand Down Expand Up @@ -153,7 +155,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
# The second criteria is elapsed time
elapsed_time = time.time() - self.start_time

logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)
logger = get_logger('EnsembleBuilder')

# First test for termination conditions
if self.time_left_for_ensembles < elapsed_time:
Expand Down Expand Up @@ -562,10 +564,17 @@ def run(

if time_left - time_buffer < 1:
break
context = multiprocessing.get_context('forkserver')
# Try to copy as many modules into the new context to reduce startup time
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
# do not copy the logging module as it causes deadlocks!
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
context.set_forkserver_preload(preload_modules)
safe_ensemble_script = pynisher.enforce_limits(
wall_time_in_s=int(time_left - time_buffer),
mem_in_mb=self.memory_limit,
logger=self.logger
logger=self.logger,
context=context,
)(self.main)
safe_ensemble_script(time_left, iteration, return_predictions)
if safe_ensemble_script.exit_status is pynisher.MemorylimitException:
Expand Down Expand Up @@ -1385,24 +1394,11 @@ def _delete_excess_models(self, selected_keys: List[str]):

"""

# Obtain a list of sorted pred keys
sorted_keys = self._get_list_of_sorted_preds()
sorted_keys = list(map(lambda x: x[0], sorted_keys))

if len(sorted_keys) <= self.max_resident_models:
# Don't waste time if not enough models to delete
return

# The top self.max_resident_models models would be the candidates
# Any other low performance model will be deleted
# The list is in ascending order of score
candidates = sorted_keys[:self.max_resident_models]

# Loop through the files currently in the directory
for pred_path in self.y_ens_files:

# Do not delete candidates
if pred_path in candidates:
if pred_path in selected_keys:
continue

if pred_path in self._has_been_candidate:
Expand Down
4 changes: 4 additions & 0 deletions autosklearn/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def __init__(
dask_client : dask.distributed.Client, optional
User-created dask client, can be used to start a dask cluster and then
attach auto-sklearn to it.

Auto-sklearn can run into a deadlock if the dask client uses threads for
parallelization, it is therefore highly recommended to use dask workers
using a single process.

disable_evaluator_output: bool or list, optional (False)
If True, disable model and prediction output. Cannot be used
Expand Down
17 changes: 10 additions & 7 deletions autosklearn/evaluation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import multiprocessing
from queue import Empty
import sys
import time
import traceback
from typing import Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -96,7 +97,7 @@ def _encode_exit_status(exit_status):
class ExecuteTaFuncWithQueue(AbstractTAFunc):

def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
logger, cost_for_crash, abort_on_first_run_crash,
cost_for_crash, abort_on_first_run_crash,
initial_num_run=1, stats=None,
run_obj='quality', par_factor=1, all_scoring_functions=False,
output_y_hat_optimization=True, include=None, exclude=None,
Expand Down Expand Up @@ -160,7 +161,6 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
self.disable_file_output = disable_file_output
self.init_params = init_params
self.budget_type = budget_type
self.logger = logger

if memory_limit is not None:
memory_limit = int(math.ceil(memory_limit))
Expand Down Expand Up @@ -244,7 +244,13 @@ def run(
instance_specific: Optional[str] = None,
) -> Tuple[StatusType, float, float, Dict[str, Union[int, float, str, Dict, List, Tuple]]]:

queue = multiprocessing.Queue()
context = multiprocessing.get_context('forkserver')
# Try to copy as many modules into the new context to reduce startup time
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
# do not copy the logging module as it causes deadlocks!
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
context.set_forkserver_preload(preload_modules)
queue = context.Queue()

if not (instance_specific is None or instance_specific == '0'):
raise ValueError(instance_specific)
Expand All @@ -257,6 +263,7 @@ def run(
wall_time_in_s=cutoff,
mem_in_mb=self.memory_limit,
capture_output=True,
context=context,
)

if isinstance(config, int):
Expand Down Expand Up @@ -436,8 +443,4 @@ def run(
runtime = float(obj.wall_clock_time)

autosklearn.evaluation.util.empty_queue(queue)
self.logger.debug(
'Finished function evaluation. Status: %s, Cost: %f, Runtime: %f, Additional %s',
status, cost, runtime, additional_run_info,
)
return status, cost, runtime, additional_run_info
5 changes: 3 additions & 2 deletions autosklearn/metalearning/input/aslib_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ def _find_files(self):
for expected_file in optional:
full_path = os.path.join(self.dir_, expected_file)
if not os.path.isfile(full_path):
self.logger.warning(
"Not found: %s (maybe you want to add it)" % (full_path))
# self.logger.warning(
# "Not found: %s (maybe you want to add it)" % (full_path))
pass
else:
self.found_files.append(full_path)

Expand Down
2 changes: 1 addition & 1 deletion autosklearn/metalearning/mismbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def suggest_via_metalearning(

task = TASK_TYPES_TO_STRING[task]

logger.warning(task)
logger.info(task)

start = time.time()
ml = MetaLearningOptimizer(
Expand Down
1 change: 0 additions & 1 deletion autosklearn/smbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ def run_smbo(self):
autosklearn_seed=seed,
resampling_strategy=self.resampling_strategy,
initial_num_run=num_run,
logger=self.logger,
include=include,
exclude=exclude,
metric=self.metric,
Expand Down
11 changes: 10 additions & 1 deletion autosklearn/util/logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ handlers:
formatter: simple
filename: autosklearn.log

distributed_logfile:
class: logging.FileHandler
level: DEBUG
formatter: simple
filename: distributed.log

root:
level: DEBUG
handlers: [console, file_handler]
Expand All @@ -26,7 +32,6 @@ loggers:
autosklearn.metalearning:
level: DEBUG
handlers: [file_handler]
propagate: no

autosklearn.util.backend:
level: DEBUG
Expand All @@ -48,3 +53,7 @@ loggers:
EnsembleBuilder:
level: DEBUG
propagate: no

distributed:
level: DEBUG
handlers: [distributed_logfile]
Loading