From 5f2bcea4e82f75de039449e0407bb4a09fe871a5 Mon Sep 17 00:00:00 2001 From: Jordan Day Date: Thu, 31 Oct 2024 16:18:20 +0000 Subject: [PATCH] Refactor wrapper functions to run main --- mbs_results/estimation/estimate.py | 5 +- mbs_results/imputation/impute.py | 2 +- mbs_results/main.py | 15 +++-- .../outlier_detection/detect_outlier.py | 21 ++++--- mbs_results/staging/stage_dataframe.py | 58 +++++++++---------- mbs_results/utilities/constrains.py | 2 - mbs_results/utilities/validation_checks.py | 24 ++++++-- 7 files changed, 76 insertions(+), 51 deletions(-) diff --git a/mbs_results/estimation/estimate.py b/mbs_results/estimation/estimate.py index 5d5a4563..0e24281f 100644 --- a/mbs_results/estimation/estimate.py +++ b/mbs_results/estimation/estimate.py @@ -21,12 +21,13 @@ def estimate(df: pd.DataFrame, config): _type_ _description_ """ + warnings.warn("Estimate is slowest stage") warnings.warn( - "temp fix is applied to convert calibration group map into" + "df in config" + "temp fix is applied to convert calibration group map into df in config" ) config["calibration_group_map"] = pd.read_csv(config["calibration_group_map"]) estimate_df = apply_estimation(**config) - estimate_df = estimate_df.drop(columns=["cell_no", "frotover"]) + estimate_df = estimate_df.drop(columns=["cell_no", "frotover", "frosic2007"]) post_estimate = pd.merge(df, estimate_df, how="left", on=["period", "reference"]) return post_estimate diff --git a/mbs_results/imputation/impute.py b/mbs_results/imputation/impute.py index 770235fd..4ff93f53 100644 --- a/mbs_results/imputation/impute.py +++ b/mbs_results/imputation/impute.py @@ -62,7 +62,7 @@ def impute(dataframe: pd.DataFrame, config: dict) -> pd.DataFrame: post_impute, "period", "reference", - "adjusted_value", + "adjustedresponse", question_no, "form_type_spp", ) diff --git a/mbs_results/main.py b/mbs_results/main.py index bb5b4176..69e114aa 100755 --- a/mbs_results/main.py +++ b/mbs_results/main.py @@ -12,23 +12,28 @@ validate_staging, ) -if __name__ == "__main__": + +def run_mbs_main(): config = load_config() validate_config(config) staged_data = stage_dataframe(config) - validate_staging(staged_data) + validate_staging(staged_data, config) # imputation: RoM wrapper -> Rename wrapper to apply_imputation imputation_output = impute(staged_data, config) - validate_imputation(imputation_output) + validate_imputation(imputation_output, config) # Estimation Wrapper estimation_output = estimate(imputation_output, config) - validate_estimation(estimation_output) + validate_estimation(estimation_output, config) # Outlier Wrapper outlier_output = detect_outlier(estimation_output, config) - validate_outlier_detection(outlier_output) + validate_outlier_detection(outlier_output, config) produce_outputs(outlier_output, "output_path/") + + +if __name__ == "__main__": + run_mbs_main() diff --git a/mbs_results/outlier_detection/detect_outlier.py b/mbs_results/outlier_detection/detect_outlier.py index 1c320c55..8c45409e 100644 --- a/mbs_results/outlier_detection/detect_outlier.py +++ b/mbs_results/outlier_detection/detect_outlier.py @@ -8,6 +8,8 @@ def join_l_values(df, l_values_path, classification_values_path): """Read l values, classifications and drop duplicates and period""" l_values = pd.read_csv(l_values_path) + l_values["classification"] = l_values["classification"].astype(str) + l_values["question_no"] = l_values["question_no"].astype("int64") # l_values = l_values.drop_duplicates(['question_no','classification']) @@ -15,8 +17,13 @@ def join_l_values(df, l_values_path, classification_values_path): # Merge on classification SIC map (merge on SIC to get classsificaion on df -> ) classification_values = pd.read_csv(classification_values_path) + classification_values["sic_5_digit"] = classification_values["sic_5_digit"].astype( + str + ) + classification_values["classification"] = classification_values[ + "classification" + ].astype(str) - print(list(classification_values)) df = pd.merge( df, classification_values, @@ -30,7 +37,7 @@ def join_l_values(df, l_values_path, classification_values_path): df, l_values, how="left", - left_on=["question_no", "classification"], + left_on=["questioncode", "classification"], right_on=["question_no", "classification"], ) @@ -51,17 +58,17 @@ def detect_outlier(df, config): "sampled", "design_weight", "calibration_factor", - "adjusted_value", + "adjustedresponse", "l_value", ) ) post_win = calculate_derived_outlier_weights( - df, + post_win, "period", "reference", - "adjusted_value", - "question_no", - "spp_form_id", + "adjustedresponse", + "questioncode", + "form_type_spp", "outlier_weight", "new_target_variable", ) diff --git a/mbs_results/staging/stage_dataframe.py b/mbs_results/staging/stage_dataframe.py index 49b44fa4..2e18a601 100644 --- a/mbs_results/staging/stage_dataframe.py +++ b/mbs_results/staging/stage_dataframe.py @@ -1,9 +1,10 @@ +import glob import warnings import pandas as pd from mbs_results.staging.create_missing_questions import create_missing_questions -from mbs_results.staging.data_cleaning import enforce_datatypes # run_live_or_frozen +from mbs_results.staging.data_cleaning import enforce_datatypes, run_live_or_frozen from mbs_results.staging.dfs_from_spp import dfs_from_spp from mbs_results.utilities.utils import read_colon_separated_file @@ -41,6 +42,17 @@ def create_mapper() -> dict: return mapper +def read_and_combine_colon_sep_files(folder_path, column_names, config): + df = pd.concat( + [ + read_colon_separated_file(f, column_names, period=config["period"]) + for f in glob.glob(folder_path) + ], + ignore_index=True, + ) + return df + + def stage_dataframe(config: dict) -> pd.DataFrame: """ wrapper function to stage and pre process the dataframe, ready to be passed onto the @@ -55,11 +67,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame: ------- _type_ Combined dataframe containing response and contributor data. Missing questions - have been created, data types enforced***. NI cell number have been converted + have been created, data types enforced. NI cell number have been converted to uk. - - *** current functionality broken but with refactoring this will be implemented - Mapping function needs to be defined but is used in other functions """ print("Staging started") @@ -71,7 +80,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame: config["platform"], config["bucket"], ) - # TODO filter responses and contributors df to columns in config + + # Filter columns and set data types contributors = contributors[config["contributors_keep_cols"].keys()] contributors = enforce_datatypes( contributors, keep_columns=config["contributors_keep_cols"], **config @@ -82,8 +92,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame: responses, keep_columns=config["responses_keep_cols"], **config ) - finalsel = read_colon_separated_file( - config["sample_path"], config["sample_column_names"], period=config[period] + finalsel = read_and_combine_colon_sep_files( + config["sample_path"], config["sample_column_names"], config ) finalsel = finalsel[config["finalsel_keep_cols"].keys()] @@ -97,9 +107,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame: right=finalsel, on=[period, reference], suffixes=["_spp", "_finalsel"], + how="left", ) - warnings.warn("Duplicate columns are created in this join, need to fix this") - # TODO map on SPP form type # contributors = create_form_type_spp_column(contributors, config) mapper = create_mapper() # Needs to be defined @@ -115,27 +124,16 @@ def stage_dataframe(config: dict) -> pd.DataFrame: ) df = responses_with_missing.drop(columns=config["form_id"]).merge( - contributors, on=[reference, period], suffixes=["_res", "_con"] + contributors, on=[reference, period], suffixes=["_res", "_con"], how="left" + ) + warnings.warn("add live or frozen after fixing error marker column in config") + df = run_live_or_frozen( + df, + config["target"], + error_marker=config["errormarker"], + state=config["state"], + error_values=[201], ) - print() - # Add run live or frozen - # df = run_live_or_frozen(df, ...) print("Staging Completed") return df - - -if __name__ == "__main__": - from mbs_results.imputation.impute import impute - from mbs_results.utilities.inputs import load_config - - config = load_config() - df = stage_dataframe(config) - # print(df[["formtype","form_type_spp"]]) - impute(df, config) - filter_col_spp = [col for col in df if col.endswith("_res")] - filter_col_finalsel = [col for col in df if col.endswith("_con")] - print(df.head()) - for i in filter_col_spp: - col_start = i.split("_")[0] - print(col_start, df[i].equals(df[col_start + "_con"])) diff --git a/mbs_results/utilities/constrains.py b/mbs_results/utilities/constrains.py index e63c9ba5..1d13e4c2 100644 --- a/mbs_results/utilities/constrains.py +++ b/mbs_results/utilities/constrains.py @@ -154,8 +154,6 @@ def constrain( "cell_no", "frotover", "froempment", - "form_type", - "response_type", ], verify_integrity=False, ) diff --git a/mbs_results/utilities/validation_checks.py b/mbs_results/utilities/validation_checks.py index ecf20582..9bb77697 100644 --- a/mbs_results/utilities/validation_checks.py +++ b/mbs_results/utilities/validation_checks.py @@ -1,4 +1,5 @@ import warnings +from importlib import metadata import pandas as pd @@ -229,17 +230,32 @@ def validate_manual_constructions(df, manual_constructions): ) -def validate_staging(df: pd.DataFrame): +def validate_staging(df: pd.DataFrame, config: dict): warnings.warn("A placeholder function for validating dataframe post staging") -def validate_imputation(df: pd.DataFrame): +def validate_imputation(df: pd.DataFrame, config: dict): warnings.warn("A placeholder function for validating dataframe post imputation") + output_path = config["output_path"] + file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"] + snapshot_name = config["mbs_file_name"].split(".")[0] + imputation_filename = f"imputation_output_v{file_version_mbs}_{snapshot_name}.csv" + df.to_csv(output_path + imputation_filename) -def validate_estimation(df: pd.DataFrame): +def validate_estimation(df: pd.DataFrame, config: dict): warnings.warn("A placeholder function for validating dataframe post estimation") + output_path = config["output_path"] + file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"] + snapshot_name = config["mbs_file_name"].split(".")[0] + estimate_filename = f"estimation_output_v{file_version_mbs}_{snapshot_name}.csv" + df.to_csv(output_path + estimate_filename) -def validate_outlier_detection(df: pd.DataFrame): +def validate_outlier_detection(df: pd.DataFrame, config: dict): warnings.warn("A placeholder function for validating dataframe post outliering") + output_path = config["output_path"] + file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"] + snapshot_name = config["mbs_file_name"].split(".")[0] + outlier_filename = f"outlier_output_v{file_version_mbs}_{snapshot_name}.csv" + df.to_csv(output_path + outlier_filename)