Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for merging of distributed input files #758

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions src/model_execution_worker/distributed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import shutil
import subprocess
import tempfile
from natsort import natsorted
from contextlib import contextmanager
from datetime import datetime

import fasteners
import filelock
import pandas as pd
from celery import Celery, signature
from celery.signals import worker_ready, before_task_publish, task_revoked, task_failure
from celery.signals import (before_task_publish, task_failure, task_revoked,
worker_ready)
from natsort import natsorted
from oasislmf import __version__ as mdk_version
from oasislmf.manager import OasisManager
from oasislmf.model_preparation.lookup import OasisLookupFactory
Expand All @@ -25,14 +26,12 @@
from oasislmf.utils.status import OASIS_TASK_STATUS
from pathlib2 import Path

from ..common.data import STORED_FILENAME, ORIGINAL_FILENAME
from ..common.data import ORIGINAL_FILENAME, STORED_FILENAME
from ..conf import celeryconf as celery_conf
from ..conf.iniconf import settings

from .storage_manager import BaseStorageConnector
from .backends.aws_storage import AwsObjectStore
from .backends.azure_storage import AzureObjectStore

from .storage_manager import BaseStorageConnector

'''
Celery task wrapper for Oasis ktools calculation.
Expand Down Expand Up @@ -214,8 +213,8 @@ def load_location_data(loc_filepath):
return get_location_df(loc_filepath)
except ImportError:
# oasislmf == 1.27.x or greater
from ods_tools.oed.exposure import OedExposure
from oasislmf.utils.data import prepare_location_df
from ods_tools.oed.exposure import OedExposure

exposure = OedExposure(location=pathlib.Path(os.path.abspath(loc_filepath)))
exposure.location.dataframe = prepare_location_df(exposure.location.dataframe)
Expand Down Expand Up @@ -699,17 +698,36 @@ def collect_keys(
storage_subdir = chunk_params['storage_subdir']
del chunk_params['chunk_keys']

def merge_dataframes(paths, output_file, file_type, unique=True):
def merge_dataframes(paths, output_file, file_type):
pd_read_func = getattr(pd, f"read_{file_type}")
df_chunks = [pd_read_func(p) for p in paths]

if not paths:
logging.warning("merge_dataframes was called with an empty path list.")
return
df_chunks = []
for path in paths:
try:
df_chunks.append(pd_read_func(path))
except pd.errors.EmptyDataError:
# Ignore empty files.
logging.info(f"File {path} is empty. --skipped--")

if not df_chunks:
logging.warning(f"All files were empty: {paths}. --skipped--")
return
# add opt for Select merge strat
df = pd.concat(df_chunks)

if unique:
df.drop_duplicates(inplace=True, ignore_index=True)
# CSV files will have a default index which must not be used to filter duplicates.
if file_type == 'parquet':
df = df[~df.index.duplicated(keep='first')]
pd_write_func = getattr(df, f"to_{file_type}")
pd_write_func(output_file, index=True)
# Only write index for parquet files to avoid useless extra column for csv files.
pd_write_func(output_file, index=file_type == 'parquet')

def take_first(paths, output_file):
first_path = paths[0]
logging.info(f"Using {first_path} and ignoring others.")
shutil.copy2(first_path, output_file)

# Collect files and tar here from chunk_params['target_dir']
with TemporaryDir() as chunks_dir:
Expand All @@ -719,22 +737,21 @@ def merge_dataframes(paths, output_file, file_type, unique=True):
extract_to = os.path.join(chunks_dir, os.path.basename(tar).split('.')[0])
filestore.extract(tar, extract_to, storage_subdir)

file_paths = glob.glob(chunks_dir + '/lookup-[0-9]*/*') # paths for every file to merge (inputs for merge)
file_names = set([os.path.basename(f) for f in file_paths]) # unqiue filenames (output merged results)
file_paths = glob.glob(chunks_dir + '/lookup-[0-9]*/*') # paths for every file to merge (inputs for merge)
file_names = set([os.path.basename(f) for f in file_paths]) # unique filenames (output merged results)

with TemporaryDir() as merge_dir:
for file in file_names:
logging.info(f'Merging into file: "{file}"')

file_type = Path(file).suffix[1:]
file_name = Path(file).stem
file_chunks = [f for f in file_paths if f.endswith(file)]
file_merged = os.path.join(merge_dir, file)
file_chunks = natsorted(file_chunks)
# file_unique_rows = True if file == 'ensemble_mapping.csv' else False

if file_type == 'csv':
merge_dataframes(file_chunks, file_merged, file_type)
elif file_type == 'parquet':
if file_name in ['events', 'occurrence', 'ensemble_mapping']:
take_first(file_chunks, file_merged)
elif file_type in ['csv', 'parquet']:
merge_dataframes(file_chunks, file_merged, file_type)
else:
logging.info(f'No merge method for file: "{file}" --skipped--')
Expand Down