Skip to content

Commit

Permalink
Streamlining additional outputs process and updating methods to work …
Browse files Browse the repository at this point in the history
…correctly with pipeline
  • Loading branch information
lhubbardONS committed Dec 6, 2024
1 parent 6a1938b commit 0b00d56
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 192 deletions.
8 changes: 6 additions & 2 deletions mbs_results/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from mbs_results.estimation.estimate import estimate
from mbs_results.imputation.impute import impute
from mbs_results.outlier_detection.detect_outlier import detect_outlier
from mbs_results.outputs.produce_additional_outputs import produce_additional_outputs
from mbs_results.outputs.produce_additional_outputs import (
get_additional_outputs_df,
produce_additional_outputs,
)
from mbs_results.staging.stage_dataframe import stage_dataframe
from mbs_results.utilities.inputs import load_config
from mbs_results.utilities.validation_checks import (
Expand Down Expand Up @@ -32,7 +35,8 @@ def run_mbs_main():
outlier_output = detect_outlier(estimation_output, config)
validate_outlier_detection(outlier_output, config)

produce_additional_outputs(config)
additional_outputs_df = get_additional_outputs_df(estimation_output, outlier_output)
produce_additional_outputs(config, additional_outputs_df)


if __name__ == "__main__":
Expand Down
11 changes: 9 additions & 2 deletions mbs_results/outputs/get_additional_outputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
def get_additional_outputs(config: dict, function_mapper: dict) -> None:
import pandas as pd


def get_additional_outputs(
config: dict, function_mapper: dict, additional_outputs_df: pd.DataFrame
) -> dict:
"""
Runs a set of functions as defined in additional_outputs from the config,
the function names must exist in function_mapper which also has the relevant
Expand Down Expand Up @@ -76,7 +81,9 @@ def get_additional_outputs(config: dict, function_mapper: dict) -> None:

if function in function_mapper:

additional_outputs[function] = function_mapper[function](**config)
additional_outputs[function] = function_mapper[function](
additional_outputs_df=additional_outputs_df, **config
)

else:
raise ValueError(
Expand Down
59 changes: 58 additions & 1 deletion mbs_results/outputs/produce_additional_outputs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from importlib import metadata

import pandas as pd

from mbs_results.outputs.get_additional_outputs import get_additional_outputs
from mbs_results.outputs.selective_editing_contributer_output import (
get_selective_editing_contributer_output,
Expand All @@ -13,14 +15,68 @@
)


def produce_additional_outputs(config: dict):
def get_additional_outputs_df(
estimation_output: pd.DataFrame, outlier_output: pd.DataFrame
):
"""
Creating dataframe that contains all variables needed for producing additional
outputs.
Parameters
----------
estimation_output : pd.DataFrame
Dataframe output from the estimation stage of the pipeline
outlier_output : pd.DataFrame
Dataframe output from the outliering stage of the pipeline
Returns
-------
pd.DataFrame
"""

additional_outputs_df = estimation_output[
[
"reference",
"period",
"design_weight",
"frosic2007",
"formtype",
"questioncode",
"frotover",
"calibration_factor",
"adjustedresponse",
"status",
"response",
"froempment",
"cell_no",
"referencename",
"imputation_flags_adjustedresponse",
"f_link_adjustedresponse",
"b_link_adjustedresponse",
"construction_link",
]
]

additional_outputs_df = additional_outputs_df.merge(
outlier_output[["reference", "period", "questioncode", "outlier_weight"]],
how="left",
on=["reference", "period", "questioncode"],
)

return additional_outputs_df


def produce_additional_outputs(config: dict, additional_outputs_df: pd.DataFrame):
"""
Function to write additional outputs
Parameters
----------
config : Dict
main pipeline configuration
additional_outputs_df : pd.DataFrame
Dataframe to feed in as arguments for additional outputs
Returns
-------
Expand All @@ -37,6 +93,7 @@ def produce_additional_outputs(config: dict):
"turnover_output": create_turnover_output,
"weighted_adj_val_time_series": get_weighted_adj_val_time_series,
},
additional_outputs_df,
)

# Stop function if no additional_outputs are listed in config.
Expand Down
36 changes: 13 additions & 23 deletions mbs_results/outputs/selective_editing_contributer_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@


def get_selective_editing_contributer_output(
input_filepath: str,
domain_filepath: str,
additional_outputs_df: pd.DataFrame,
sic_domain_mapping_path: str,
threshold_filepath: str,
sic_input: str,
sic_mapping: str,
period_selected: int,
**config
) -> pd.DataFrame:
Expand All @@ -19,17 +17,12 @@ def get_selective_editing_contributer_output(
Parameters
----------
input_filepath : str
Filepath to csv file containing reference, imp_class, period and
SIC columns.
domain_filepath : str
additional_outputs_df : pd.DataFrame
Dataframe containing reference, design_weight, formtype, period and SIC columns.
sic_domain_mapping_path : str
Filepath to csv file containing SIC and domain columns.
threshold_filepath : str
Filepath to csv file containing form type, domain and threshold columns.
sic_input : str
Name of column in input_filepath csv file containing SIC variable.
sic_mapping : str
Name of column in domain_filepath csv file containing SIC variable.
period_selected : int
period to include in outputs
**config: Dict
Expand All @@ -46,32 +39,29 @@ def get_selective_editing_contributer_output(
>> input_filepath=input_filepath,
>> domain_filepath=domain_filepath,
>> threshold_filepath=threshold_filepath,
>> sic_input="sic_5_digit",
>> sic_mapping="sic_5_digit",
>> period_selected=202201
>> )
"""

input_data = pd.read_csv(
input_filepath,
usecols=["period", "reference", "design_weight", sic_input, "form_type"],
)
input_data = additional_outputs_df[
["period", "reference", "design_weight", "frosic2007", "formtype"]
]

domain_data = pd.read_csv(domain_filepath)
domain_data = pd.read_csv(sic_domain_mapping_path).astype(str)

threshold_mapping = pd.read_csv(threshold_filepath)
threshold_mapping = pd.read_csv(threshold_filepath).astype(str)

selective_editing_contributer_output = merge_domain(
input_data, domain_data, sic_input, sic_mapping
input_data, domain_data, "frosic2007", "sic_5_digit"
)

selective_editing_contributer_output = pd.merge(
selective_editing_contributer_output,
threshold_mapping,
left_on=["form_type", "domain"],
left_on=["formtype", "domain"],
right_on=["form", "domain"],
how="left",
).drop(columns=["form", "form_type"])
).drop(columns=["form", "formtype"])

selective_editing_contributer_output = selective_editing_contributer_output.rename(
columns={"reference": "ruref", "domain": "domain_group"}
Expand Down
84 changes: 22 additions & 62 deletions mbs_results/outputs/selective_editing_question_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,7 @@


def create_selective_editing_question_output(
df: pd.DataFrame,
reference: str,
period: str,
domain: str,
question_no: str,
sic: str,
aux: str,
a_weight: str,
o_weight: str,
g_weight: str,
adjusted_value: str,
additional_outputs_df: pd.DataFrame,
sic_domain_mapping_path: str,
period_selected: int,
**config,
Expand All @@ -26,29 +16,9 @@ def create_selective_editing_question_output(
Parameters
----------
df : pd.DataFrame
Reference dataframe with domain, a_weights, o_weights, and g_weights
reference : str
name of column in dataframe containing reference variable
period : str
name of column in dataframe containing period variable
domain : str
name of column name containing domain variable in sic_domain_mapping file.
question_no : str
name of column in dataframe containing question number variable
sic : str
name of column in dataframe containing sic variable
aux : str
name of column in dataframe containing auxiliary value variable
a_weight : str
Column name containing the design weight.
o_weight : str
column name containing the outlier weight.
g_weight : str
column name containing the g weight.
adjusted_value : str
name of column in dataframe containing adjusted_value variable combined
with imputed_values as outputted from Ratio of Means script
additional_outputs_df : pd.DataFrame
Reference dataframe with sic, a_weights, o_weights, g_weights,
adjustedresponse, imputation_flags and frotover
sic_domain_mapping_path : str
path to the sic domain mapping file
period_selected : int
Expand All @@ -65,58 +35,48 @@ def create_selective_editing_question_output(
Examples
--------
>> output = create_selective_editing_question_output(
>> df=wins_output,
>> reference="reference",
>> period="period",
>> domain="domain",
>> question_no="question_no",
>> sic="sic_5_digit",
>> aux="frotover",
>> a_weight="design_weight",
>> o_weight="outlier_weight",
>> g_weight="calibration_factor",
>> adjusted_value="adjusted_value",
>> additional_outputs_df=wins_output,
>> sic_domain_mapping_path="mapping_files/sic_domain_mapping.csv",
>> period_selected=202201,
>> )
"""
sic_domain_mapping = pd.read_csv(sic_domain_mapping_path).astype(int)
sic_domain_mapping = pd.read_csv(sic_domain_mapping_path).astype(str)

df_with_domain = merge_domain(
input_df=df,
input_df=additional_outputs_df,
domain_mapping=sic_domain_mapping,
sic_input=sic,
sic_input="frosic2007",
sic_mapping="sic_5_digit",
)

standardising_factor = create_standardising_factor(
dataframe=df_with_domain,
reference=reference,
period=period,
domain=domain,
question_no=question_no,
predicted_value=adjusted_value,
imputation_marker="imputation_flags_adjusted_value",
a_weight=a_weight,
o_weight=o_weight,
g_weight=g_weight,
auxiliary_value=aux,
reference="reference",
period="period",
domain="domain",
question_no="questioncode",
predicted_value="adjustedresponse",
imputation_marker="imputation_flags_adjustedresponse",
a_weight="design_weight",
o_weight="outlier_weight",
g_weight="calibration_factor",
auxiliary_value="frotover",
period_selected=period_selected,
)

# Survey code is requested on this output, 009 is MBS code
standardising_factor["survey_code"] = "009"

standardising_factor["imputation_flags_adjusted_value"] = standardising_factor[
"imputation_flags_adjusted_value"
standardising_factor["imputation_flags_adjustedresponse"] = standardising_factor[
"imputation_flags_adjustedresponse"
].str.upper()
standardising_factor = standardising_factor.rename(
columns={
"reference": "ruref",
"domain": "domain_group",
aux: "auxiliary_value",
"frotover": "auxiliary_value",
"imputation_flags_adjusted_value": "imputation_marker",
question_no: "question_code",
"questioncode": "question_code",
}
)

Expand Down
Loading

0 comments on commit 0b00d56

Please sign in to comment.