Skip to content

Commit

Permalink
Refactor wrapper functions to run main
Browse files Browse the repository at this point in the history
  • Loading branch information
Jday7879 committed Oct 31, 2024
1 parent 2957a35 commit 5f2bcea
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 51 deletions.
5 changes: 3 additions & 2 deletions mbs_results/estimation/estimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ def estimate(df: pd.DataFrame, config):
_type_
_description_
"""
warnings.warn("Estimate is slowest stage")
warnings.warn(
"temp fix is applied to convert calibration group map into" + "df in config"
"temp fix is applied to convert calibration group map into df in config"
)
config["calibration_group_map"] = pd.read_csv(config["calibration_group_map"])
estimate_df = apply_estimation(**config)
estimate_df = estimate_df.drop(columns=["cell_no", "frotover"])
estimate_df = estimate_df.drop(columns=["cell_no", "frotover", "frosic2007"])
post_estimate = pd.merge(df, estimate_df, how="left", on=["period", "reference"])

return post_estimate
2 changes: 1 addition & 1 deletion mbs_results/imputation/impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def impute(dataframe: pd.DataFrame, config: dict) -> pd.DataFrame:
post_impute,
"period",
"reference",
"adjusted_value",
"adjustedresponse",
question_no,
"form_type_spp",
)
Expand Down
15 changes: 10 additions & 5 deletions mbs_results/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,28 @@
validate_staging,
)

if __name__ == "__main__":

def run_mbs_main():
config = load_config()
validate_config(config)

staged_data = stage_dataframe(config)
validate_staging(staged_data)
validate_staging(staged_data, config)

# imputation: RoM wrapper -> Rename wrapper to apply_imputation
imputation_output = impute(staged_data, config)
validate_imputation(imputation_output)
validate_imputation(imputation_output, config)

# Estimation Wrapper
estimation_output = estimate(imputation_output, config)
validate_estimation(estimation_output)
validate_estimation(estimation_output, config)

# Outlier Wrapper
outlier_output = detect_outlier(estimation_output, config)
validate_outlier_detection(outlier_output)
validate_outlier_detection(outlier_output, config)

produce_outputs(outlier_output, "output_path/")


if __name__ == "__main__":
run_mbs_main()
21 changes: 14 additions & 7 deletions mbs_results/outlier_detection/detect_outlier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@ def join_l_values(df, l_values_path, classification_values_path):
"""Read l values, classifications and drop duplicates and period"""

l_values = pd.read_csv(l_values_path)
l_values["classification"] = l_values["classification"].astype(str)
l_values["question_no"] = l_values["question_no"].astype("int64")

# l_values = l_values.drop_duplicates(['question_no','classification'])

# l_values = l_values.drop(columns=["period"])

# Merge on classification SIC map (merge on SIC to get classsificaion on df -> )
classification_values = pd.read_csv(classification_values_path)
classification_values["sic_5_digit"] = classification_values["sic_5_digit"].astype(
str
)
classification_values["classification"] = classification_values[
"classification"
].astype(str)

print(list(classification_values))
df = pd.merge(
df,
classification_values,
Expand All @@ -30,7 +37,7 @@ def join_l_values(df, l_values_path, classification_values_path):
df,
l_values,
how="left",
left_on=["question_no", "classification"],
left_on=["questioncode", "classification"],
right_on=["question_no", "classification"],
)

Expand All @@ -51,17 +58,17 @@ def detect_outlier(df, config):
"sampled",
"design_weight",
"calibration_factor",
"adjusted_value",
"adjustedresponse",
"l_value",
)
)
post_win = calculate_derived_outlier_weights(
df,
post_win,
"period",
"reference",
"adjusted_value",
"question_no",
"spp_form_id",
"adjustedresponse",
"questioncode",
"form_type_spp",
"outlier_weight",
"new_target_variable",
)
Expand Down
58 changes: 28 additions & 30 deletions mbs_results/staging/stage_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import glob
import warnings

import pandas as pd

from mbs_results.staging.create_missing_questions import create_missing_questions
from mbs_results.staging.data_cleaning import enforce_datatypes # run_live_or_frozen
from mbs_results.staging.data_cleaning import enforce_datatypes, run_live_or_frozen
from mbs_results.staging.dfs_from_spp import dfs_from_spp
from mbs_results.utilities.utils import read_colon_separated_file

Expand Down Expand Up @@ -41,6 +42,17 @@ def create_mapper() -> dict:
return mapper


def read_and_combine_colon_sep_files(folder_path, column_names, config):
df = pd.concat(
[
read_colon_separated_file(f, column_names, period=config["period"])
for f in glob.glob(folder_path)
],
ignore_index=True,
)
return df


def stage_dataframe(config: dict) -> pd.DataFrame:
"""
wrapper function to stage and pre process the dataframe, ready to be passed onto the
Expand All @@ -55,11 +67,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame:
-------
_type_
Combined dataframe containing response and contributor data. Missing questions
have been created, data types enforced***. NI cell number have been converted
have been created, data types enforced. NI cell number have been converted
to uk.
*** current functionality broken but with refactoring this will be implemented
Mapping function needs to be defined but is used in other functions
"""

print("Staging started")
Expand All @@ -71,7 +80,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame:
config["platform"],
config["bucket"],
)
# TODO filter responses and contributors df to columns in config

# Filter columns and set data types
contributors = contributors[config["contributors_keep_cols"].keys()]
contributors = enforce_datatypes(
contributors, keep_columns=config["contributors_keep_cols"], **config
Expand All @@ -82,8 +92,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame:
responses, keep_columns=config["responses_keep_cols"], **config
)

finalsel = read_colon_separated_file(
config["sample_path"], config["sample_column_names"], period=config[period]
finalsel = read_and_combine_colon_sep_files(
config["sample_path"], config["sample_column_names"], config
)

finalsel = finalsel[config["finalsel_keep_cols"].keys()]
Expand All @@ -97,9 +107,8 @@ def stage_dataframe(config: dict) -> pd.DataFrame:
right=finalsel,
on=[period, reference],
suffixes=["_spp", "_finalsel"],
how="left",
)
warnings.warn("Duplicate columns are created in this join, need to fix this")
# TODO map on SPP form type
#
contributors = create_form_type_spp_column(contributors, config)
mapper = create_mapper() # Needs to be defined
Expand All @@ -115,27 +124,16 @@ def stage_dataframe(config: dict) -> pd.DataFrame:
)

df = responses_with_missing.drop(columns=config["form_id"]).merge(
contributors, on=[reference, period], suffixes=["_res", "_con"]
contributors, on=[reference, period], suffixes=["_res", "_con"], how="left"
)
warnings.warn("add live or frozen after fixing error marker column in config")
df = run_live_or_frozen(
df,
config["target"],
error_marker=config["errormarker"],
state=config["state"],
error_values=[201],
)
print()
# Add run live or frozen
# df = run_live_or_frozen(df, ...)
print("Staging Completed")

return df


if __name__ == "__main__":
from mbs_results.imputation.impute import impute
from mbs_results.utilities.inputs import load_config

config = load_config()
df = stage_dataframe(config)
# print(df[["formtype","form_type_spp"]])
impute(df, config)
filter_col_spp = [col for col in df if col.endswith("_res")]
filter_col_finalsel = [col for col in df if col.endswith("_con")]
print(df.head())
for i in filter_col_spp:
col_start = i.split("_")[0]
print(col_start, df[i].equals(df[col_start + "_con"]))
2 changes: 0 additions & 2 deletions mbs_results/utilities/constrains.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ def constrain(
"cell_no",
"frotover",
"froempment",
"form_type",
"response_type",
],
verify_integrity=False,
)
Expand Down
24 changes: 20 additions & 4 deletions mbs_results/utilities/validation_checks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import warnings
from importlib import metadata

import pandas as pd

Expand Down Expand Up @@ -229,17 +230,32 @@ def validate_manual_constructions(df, manual_constructions):
)


def validate_staging(df: pd.DataFrame):
def validate_staging(df: pd.DataFrame, config: dict):
warnings.warn("A placeholder function for validating dataframe post staging")


def validate_imputation(df: pd.DataFrame):
def validate_imputation(df: pd.DataFrame, config: dict):
warnings.warn("A placeholder function for validating dataframe post imputation")
output_path = config["output_path"]
file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"]
snapshot_name = config["mbs_file_name"].split(".")[0]
imputation_filename = f"imputation_output_v{file_version_mbs}_{snapshot_name}.csv"
df.to_csv(output_path + imputation_filename)


def validate_estimation(df: pd.DataFrame):
def validate_estimation(df: pd.DataFrame, config: dict):
warnings.warn("A placeholder function for validating dataframe post estimation")
output_path = config["output_path"]
file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"]
snapshot_name = config["mbs_file_name"].split(".")[0]
estimate_filename = f"estimation_output_v{file_version_mbs}_{snapshot_name}.csv"
df.to_csv(output_path + estimate_filename)


def validate_outlier_detection(df: pd.DataFrame):
def validate_outlier_detection(df: pd.DataFrame, config: dict):
warnings.warn("A placeholder function for validating dataframe post outliering")
output_path = config["output_path"]
file_version_mbs = metadata.metadata("monthly-business-survey-results")["version"]
snapshot_name = config["mbs_file_name"].split(".")[0]
outlier_filename = f"outlier_output_v{file_version_mbs}_{snapshot_name}.csv"
df.to_csv(output_path + outlier_filename)

0 comments on commit 5f2bcea

Please sign in to comment.