Skip to content

Commit

Permalink
Feature/pre analysis hook (#663)
Browse files Browse the repository at this point in the history
* Basic pre-analysis hook

* tidy

* Fix loading

* Fix portfolio file removal

* Clean up tmp pre-analysis-hook exposure files on failed run

* Update oasislmf package pin
  • Loading branch information
sambles authored Jul 1, 2022
1 parent e2ca1e8 commit ba0cec4
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 33 deletions.
8 changes: 4 additions & 4 deletions conf.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ LOCK_RETRY_COUNTDOWN_IN_SECS=10
MEDIA_ROOT = /shared-fs/
LOG_FILENAME = oasis_api_worker.log
STORAGE_TYPE = shared-fs
KEEP_RUN_DIR = False
KEEP_LOCAL_DATA = False
AWS_LOG_LEVEL = WARNING

[server]
Expand Down Expand Up @@ -58,9 +58,9 @@ AWS_SHARED_BUCKET=True
AWS_LOCATION=worker
MODEL_SETTINGS_FILE = /home/worker/model/meta-data/model_settings.json
#OASISLMF_CONFIG = /home/worker/model/oasislmf.json
KEEP_RUN_DIR = False
KEEP_CHUNK_DATA = False
INPUT_GENERATION_CHUNK_SIZE = 5000
#KEEP_LOCAL_DATA = False
#KEEP_REMOTE_DATA = False
#INPUT_GENERATION_CHUNK_SIZE = 5000


# --- Example settings --- #
Expand Down
4 changes: 2 additions & 2 deletions requirements-worker.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
# pip-compile requirements-worker.in
#

# Pin Oasislmf for elt & summarycalc output fix - https://github.com/OasisLMF/OasisLMF/pull/1060
git+https://github.com/OasisLMF/OasisLMF.git@49cfa3cece068a2de5027827a0e32274fb61fe9f#egg=oasislmf[extra]
# Pin Oasislmf to 'develop' for pre-analysis-hook fix
git+https://github.com/OasisLMF/OasisLMF.git@9256ab0be4b0cef06ea895094f06fbeeae2b22db#egg=oasislmf[extra]

amqp==5.1.0
# via kombu
Expand Down
138 changes: 112 additions & 26 deletions src/model_execution_worker/distributed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pandas as pd
from billiard.exceptions import WorkerLostError
from celery import Celery, signature
from celery.signals import worker_ready, before_task_publish, task_revoked
from celery.signals import worker_ready, before_task_publish, task_revoked, task_failure
from oasislmf import __version__ as mdk_version
from oasislmf.manager import OasisManager
from oasislmf.model_preparation.lookup import OasisLookupFactory
Expand Down Expand Up @@ -262,7 +262,8 @@ def register_worker(sender, **k):
# Optional ENV
logging.info("MODEL_SETTINGS_FILE: {}".format(settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='None')))
logging.info("DISABLE_WORKER_REG: {}".format(settings.getboolean('worker', 'DISABLE_WORKER_REG', fallback='False')))
logging.info("KEEP_RUN_DIR: {}".format(settings.get('worker', 'KEEP_RUN_DIR', fallback='False')))
logging.info("KEEP_LOCAL_DATA: {}".format(settings.get('worker', 'KEEP_LOCAL_DATA', fallback='False')))
logging.info("KEEP_REMOTE_DATA: {}".format(settings.get('worker', 'KEEP_REMOTE_DATA', fallback='False')))
logging.info("BASE_RUN_DIR: {}".format(settings.get('worker', 'BASE_RUN_DIR', fallback='None')))
logging.info("OASISLMF_CONFIG: {}".format(settings.get('worker', 'oasislmf_config', fallback='None')))
logging.info("TASK_LOG_DIR: {}".format(settings.get('worker', 'TASK_LOG_DIR', fallback='/var/log/oasis/tasks')))
Expand Down Expand Up @@ -393,17 +394,33 @@ def maybe_prepare_complex_data_files(complex_data_files, user_data_dir):
except OSError:
logging.info(f'Failed to remove {user_data_dir}.lock')

def maybe_fetch_file(datafile, filepath):
def maybe_fetch_file(datafile, filepath, subdir=''):
with filelock.FileLock(f'{filepath}.lock'):
if not Path(filepath).exists():
logging.info(f'file: {datafile}')
logging.info(f'filepath: {filepath}')
filestore.get(datafile, filepath)
filestore.get(datafile, filepath, subdir)
try:
os.remove(f'{filepath}.lock')
except OSError:
logging.info(f'Failed to remove {filepath}.lock')


def get_file_ref(kwargs, params, arg_name):
""" Either fetch file ref from Kwargs or override from pre-analysis hook
"""
file_from_server = kwargs.get(arg_name)
file_from_hook = params.get(f'pre_{arg_name}')
if not file_from_server:
logging.info(f'{arg_name}: (Not loaded)')
return None
elif file_from_hook:
logging.info(f'{arg_name}: {file_from_hook} (pre-analysis-hook)')
return file_from_hook
logging.info(f'{arg_name}: {file_from_server} (portfolio)')
return file_from_server


def log_task_entry(slug, request_id, analysis_id):
if slug:
logging.info('\n')
Expand Down Expand Up @@ -447,38 +464,40 @@ def _prepare_directories(params, analysis_id, run_data_uuid, kwargs):
params.setdefault('keys_fp', os.path.join(params['root_run_dir'], 'keys.csv'))
params.setdefault('keys_errors_fp', os.path.join(params['root_run_dir'], 'keys-errors.csv'))

# Fetch keyword args
loc_file = kwargs.get('loc_file')
acc_file = kwargs.get('acc_file')
info_file = kwargs.get('info_file')
scope_file = kwargs.get('scope_file')
# user settings and data
settings_file = kwargs.get('analysis_settings_file')
complex_data_files = kwargs.get('complex_data_files')

# Load OED file references (filenames or object keys)
loc_file = get_file_ref(kwargs, params, 'loc_file')
acc_file = get_file_ref(kwargs, params, 'acc_file')
info_file = get_file_ref(kwargs, params, 'info_file')
scope_file = get_file_ref(kwargs, params, 'scope_file')

# Prepare 'generate-oasis-files' input files
if loc_file:
loc_extention = "".join(pathlib.Path(loc_file).suffixes)
loc_subdir = params.get('storage_subdir','') if params.get('pre_loc_file') else ''
params['oed_location_csv'] = os.path.join(params['root_run_dir'], f'location{loc_extention}')
maybe_fetch_file(loc_file, params['oed_location_csv'])

maybe_fetch_file(loc_file, params['oed_location_csv'], loc_subdir)
if acc_file:
acc_extention = "".join(pathlib.Path(acc_file).suffixes)
acc_subdir = params.get('storage_subdir','') if params.get('pre_acc_file') else ''
params['oed_accounts_csv'] = os.path.join(params['root_run_dir'], f'account{acc_extention}')
maybe_fetch_file(acc_file, params['oed_accounts_csv'])

maybe_fetch_file(acc_file, params['oed_accounts_csv'], acc_subdir)
if info_file:
info_extention = "".join(pathlib.Path(info_file).suffixes)
info_subdir = params.get('storage_subdir','') if params.get('pre_info_file') else ''
params['oed_info_csv'] = os.path.join(params['root_run_dir'], f'reinsinfo{info_extention}')
maybe_fetch_file(info_file, params['oed_info_csv'])

maybe_fetch_file(info_file, params['oed_info_csv'], info_subdir)
if scope_file:
scope_extention = "".join(pathlib.Path(scope_file).suffixes)
scope_subdir = params.get('storage_subdir','') if params.get('pre_scope_file') else ''
params['oed_scope_csv'] = os.path.join(params['root_run_dir'], f'reinsscope{scope_extention}')
maybe_fetch_file(scope_file, params['oed_scope_csv'])
maybe_fetch_file(scope_file, params['oed_scope_csv'], scope_subdir)

if settings_file:
maybe_fetch_file(settings_file, params['lookup_complex_config_json'])

if complex_data_files:
maybe_prepare_complex_data_files(complex_data_files, params['user_data_dir'])
else:
Expand All @@ -496,7 +515,7 @@ def run(self, params, *args, run_data_uuid=None, analysis_id=None, **kwargs):
else:
_prepare_directories(params, analysis_id, run_data_uuid, kwargs)

#log_params(params, kwargs)
log_params(params, kwargs)
return fn(self, params, *args, analysis_id=analysis_id, run_data_uuid=run_data_uuid, **kwargs)

return run
Expand Down Expand Up @@ -534,7 +553,9 @@ def prepare_input_generation_params(
'lookup_config_json',
'model_version_csv',
'lookup_module_path',
'model_settings_json'
'model_settings_json',
'exposure_pre_analysis_module',
'exposure_pre_analysis_setting_json',
]
for path_val in lookup_path_vars:
if lookup_params.get(path_val, False):
Expand All @@ -545,12 +566,48 @@ def prepare_input_generation_params(
)
lookup_params[path_val] = abs_path_val

params = OasisManager()._params_generate_files(**lookup_params)
gen_files_params = OasisManager()._params_generate_files(**lookup_params)
pre_hook_params = OasisManager()._params_exposure_pre_analysis(**lookup_params)
params = {**gen_files_params, **pre_hook_params}

params['log_location'] = filestore.put(kwargs.get('log_filename'))
params['verbose'] = debug_worker
return params


@app.task(bind=True, name='pre_analysis_hook', **celery_conf.worker_task_kwargs)
@keys_generation_task
def pre_analysis_hook(self,
params,
run_data_uuid=None,
analysis_id=None,
initiator_id=None,
slug=None,
**kwargs
):
if params.get('exposure_pre_analysis_module'):
with TemporaryDir() as hook_target_dir:
params['oasis_files_dir'] = hook_target_dir
pre_hook_output = OasisManager().exposure_pre_analysis(**params)
files_modified = pre_hook_output.get('modified', {})

# store updated files
params['pre_loc_file'] = filestore.put(files_modified.get('oed_location_csv'), subdir=params['storage_subdir'])
params['pre_acc_file'] = filestore.put(files_modified.get('oed_accounts_csv'), subdir=params['storage_subdir'])
params['pre_info_file'] = filestore.put(files_modified.get('oed_info_csv'), subdir=params['storage_subdir'])
params['pre_scope_file'] = filestore.put(files_modified.get('oed_scope_csv'), subdir=params['storage_subdir'])

# remove any pre-loaded files (only affects this worker)
oed_files = {v for k,v in params.items() if k.startswith('oed_') and isinstance(v, str)}
for filepath in oed_files:
if Path(filepath).exists():
os.remove(filepath)
else:
logging.info('pre_analysis_hook: SKIPPING, param "exposure_pre_analysis_module" not set')
params['log_location'] = filestore.put(kwargs.get('log_filename'))
return params


@app.task(bind=True, name='prepare_keys_file_chunk', **celery_conf.worker_task_kwargs)
@keys_generation_task
def prepare_keys_file_chunk(
Expand Down Expand Up @@ -635,7 +692,6 @@ def load_dataframes(paths):
try:
df = pd.read_csv(p)
yield df
#except OasisException:
except Exception:
logging.info('Failed to load chunk: {}'.format(p))
pass
Expand Down Expand Up @@ -712,12 +768,25 @@ def write_input_files(self, params, run_data_uuid=None, analysis_id=None, initia
@app.task(bind=True, name='cleanup_input_generation', **celery_conf.worker_task_kwargs)
@keys_generation_task
def cleanup_input_generation(self, params, analysis_id=None, initiator_id=None, run_data_uuid=None, slug=None, **kwargs):
if not settings.getboolean('worker', 'KEEP_RUN_DIR', fallback=False):

# check for pre-analysis files and remove

if not settings.getboolean('worker', 'KEEP_LOCAL_DATA', fallback=False):
# Delete local copy of run data
shutil.rmtree(params['target_dir'], ignore_errors=True)
if not settings.getboolean('worker', 'KEEP_CHUNK_DATA', fallback=False):
if not settings.getboolean('worker', 'KEEP_REMOTE_DATA', fallback=False):
# Delete remote copy of run data
filestore.delete_dir(params['storage_subdir'])
# Delete pre-analysis files
if params.get('pre_loc_file'):
filestore.delete_file(params.get('pre_loc_file'))
if params.get('pre_acc_file'):
filestore.delete_file(params.get('pre_acc_file'))
if params.get('pre_info_file'):
filestore.delete_file(params.get('pre_info_file'))
if params.get('pre_scope_file'):
filestore.delete_file(params.get('pre_scope_file'))

params['log_location'] = filestore.put(kwargs.get('log_filename'))
return params

Expand Down Expand Up @@ -827,7 +896,7 @@ def run(self, params, *args, run_data_uuid=None, analysis_id=None, **kwargs):
else:
_prepare_directories(params, analysis_id, run_data_uuid, kwargs)

#log_params(params, kwargs)
log_params(params, kwargs)
return fn(self, params, *args, analysis_id=analysis_id, **kwargs)

return run
Expand Down Expand Up @@ -951,10 +1020,10 @@ def generate_losses_output(self, params, analysis_id=None, slug=None, **kwargs):
@app.task(bind=True, name='cleanup_losses_generation', **celery_conf.worker_task_kwargs)
@loss_generation_task
def cleanup_losses_generation(self, params, analysis_id=None, slug=None, **kwargs):
if not settings.getboolean('worker', 'KEEP_RUN_DIR', fallback=False):
if not settings.getboolean('worker', 'KEEP_LOCAL_DATA', fallback=False):
# Delete local copy of run data
shutil.rmtree(params['root_run_dir'], ignore_errors=True)
if not settings.getboolean('worker', 'KEEP_CHUNK_DATA', fallback=False):
if not settings.getboolean('worker', 'KEEP_REMOTE_DATA', fallback=False):
# Delete remote copy of run data
filestore.delete_dir(params['storage_subdir'])
params['log_location'] = filestore.put(kwargs.get('log_filename'))
Expand Down Expand Up @@ -997,6 +1066,23 @@ def prepare_complex_model_file_inputs(complex_model_files, run_directory):
else:
os.symlink(from_path, to_path)

@task_failure.connect
def handle_task_failure(*args, sender=None, task_id=None, **kwargs):
logging.info("Task error handler")
task_args = kwargs.get('args')[0]

keep_local_data = settings.getboolean('worker', 'KEEP_LOCAL_DATA', fallback=False)
dir_local_data = task_args.get('root_run_dir')
keep_remote_data = settings.getboolean('worker', 'KEEP_REMOTE_DATA', fallback=False)
dir_remote_data = task_args.get('storage_subdir')

if not keep_local_data:
logging.info(f"deleting local data, {dir_local_data}")
shutil.rmtree(dir_local_data, ignore_errors=True)
if not keep_remote_data:
logging.info(f"deleting remote data, {dir_remote_data}")
filestore.delete_dir(dir_remote_data)


@before_task_publish.connect
def mark_task_as_queued_receiver(*args, headers=None, body=None, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions src/server/oasisapi/analyses/task_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ def get_inputs_generation_tasks(cls, analysis: 'Analysis', initiator: User, run_
queue,
TaskParams(**base_kwargs),
),
cls.get_subtask_statuses_and_signature(
'pre_analysis_hook',
analysis,
initiator,
run_data_uuid,
'Pre analysis hook',
'pre-analysis-hook',
queue,
TaskParams(**files_kwargs),
),
cls.get_subchord_statuses_and_signature(
'prepare_keys_file_chunk',
analysis,
Expand Down
2 changes: 1 addition & 1 deletion src/server/oasisapi/analyses/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ def handle_task_failure(
logger.exception(str(e))

# cleanup the temporary run files
if not worker_settings.getboolean('worker', 'KEEP_RUN_DIR', fallback=False) and run_data_uuid:
if not worker_settings.getboolean('worker', 'KEEP_LOCAL_DATA', fallback=False) and run_data_uuid:
rmtree(
os.path.join(worker_settings.get('worker', 'run_data_dir', fallback='/data'), f'analysis-{analysis_id}-{run_data_uuid}'),
ignore_errors=True
Expand Down

0 comments on commit ba0cec4

Please sign in to comment.