diff --git a/src/model_execution_worker/distributed_tasks.py b/src/model_execution_worker/distributed_tasks.py index 042cbc55c..0291b95f7 100644 --- a/src/model_execution_worker/distributed_tasks.py +++ b/src/model_execution_worker/distributed_tasks.py @@ -8,7 +8,6 @@ import shutil import subprocess import tempfile -from natsort import natsorted from contextlib import contextmanager from datetime import datetime @@ -16,7 +15,9 @@ import filelock import pandas as pd from celery import Celery, signature -from celery.signals import worker_ready, before_task_publish, task_revoked, task_failure +from celery.signals import (before_task_publish, task_failure, task_revoked, + worker_ready) +from natsort import natsorted from oasislmf import __version__ as mdk_version from oasislmf.manager import OasisManager from oasislmf.model_preparation.lookup import OasisLookupFactory @@ -25,14 +26,12 @@ from oasislmf.utils.status import OASIS_TASK_STATUS from pathlib2 import Path -from ..common.data import STORED_FILENAME, ORIGINAL_FILENAME +from ..common.data import ORIGINAL_FILENAME, STORED_FILENAME from ..conf import celeryconf as celery_conf from ..conf.iniconf import settings - -from .storage_manager import BaseStorageConnector from .backends.aws_storage import AwsObjectStore from .backends.azure_storage import AzureObjectStore - +from .storage_manager import BaseStorageConnector ''' Celery task wrapper for Oasis ktools calculation. @@ -214,8 +213,8 @@ def load_location_data(loc_filepath): return get_location_df(loc_filepath) except ImportError: # oasislmf == 1.27.x or greater - from ods_tools.oed.exposure import OedExposure from oasislmf.utils.data import prepare_location_df + from ods_tools.oed.exposure import OedExposure exposure = OedExposure(location=pathlib.Path(os.path.abspath(loc_filepath))) exposure.location.dataframe = prepare_location_df(exposure.location.dataframe) @@ -699,17 +698,36 @@ def collect_keys( storage_subdir = chunk_params['storage_subdir'] del chunk_params['chunk_keys'] - def merge_dataframes(paths, output_file, file_type, unique=True): + def merge_dataframes(paths, output_file, file_type): pd_read_func = getattr(pd, f"read_{file_type}") - df_chunks = [pd_read_func(p) for p in paths] - + if not paths: + logging.warning("merge_dataframes was called with an empty path list.") + return + df_chunks = [] + for path in paths: + try: + df_chunks.append(pd_read_func(path)) + except pd.errors.EmptyDataError: + # Ignore empty files. + logging.info(f"File {path} is empty. --skipped--") + + if not df_chunks: + logging.warning(f"All files were empty: {paths}. --skipped--") + return # add opt for Select merge strat df = pd.concat(df_chunks) - if unique: - df.drop_duplicates(inplace=True, ignore_index=True) + # CSV files will have a default index which must not be used to filter duplicates. + if file_type == 'parquet': + df = df[~df.index.duplicated(keep='first')] pd_write_func = getattr(df, f"to_{file_type}") - pd_write_func(output_file, index=True) + # Only write index for parquet files to avoid useless extra column for csv files. + pd_write_func(output_file, index=file_type == 'parquet') + + def take_first(paths, output_file): + first_path = paths[0] + logging.info(f"Using {first_path} and ignoring others.") + shutil.copy2(first_path, output_file) # Collect files and tar here from chunk_params['target_dir'] with TemporaryDir() as chunks_dir: @@ -719,22 +737,21 @@ def merge_dataframes(paths, output_file, file_type, unique=True): extract_to = os.path.join(chunks_dir, os.path.basename(tar).split('.')[0]) filestore.extract(tar, extract_to, storage_subdir) - file_paths = glob.glob(chunks_dir + '/lookup-[0-9]*/*') # paths for every file to merge (inputs for merge) - file_names = set([os.path.basename(f) for f in file_paths]) # unqiue filenames (output merged results) + file_paths = glob.glob(chunks_dir + '/lookup-[0-9]*/*') # paths for every file to merge (inputs for merge) + file_names = set([os.path.basename(f) for f in file_paths]) # unique filenames (output merged results) with TemporaryDir() as merge_dir: for file in file_names: logging.info(f'Merging into file: "{file}"') - file_type = Path(file).suffix[1:] + file_name = Path(file).stem file_chunks = [f for f in file_paths if f.endswith(file)] file_merged = os.path.join(merge_dir, file) file_chunks = natsorted(file_chunks) - # file_unique_rows = True if file == 'ensemble_mapping.csv' else False - if file_type == 'csv': - merge_dataframes(file_chunks, file_merged, file_type) - elif file_type == 'parquet': + if file_name in ['events', 'occurrence', 'ensemble_mapping']: + take_first(file_chunks, file_merged) + elif file_type in ['csv', 'parquet']: merge_dataframes(file_chunks, file_merged, file_type) else: logging.info(f'No merge method for file: "{file}" --skipped--')